pyceo/ceod/model/MailmanService.py

52 lines
1.8 KiB
Python

import requests
from requests.auth import HTTPBasicAuth
from zope import component
from zope.interface import implementer
from ceo_common.interfaces import IMailmanService, IConfig
@implementer(IMailmanService)
class MailmanService:
def __init__(self):
cfg = component.getUtility(IConfig)
self.base_domain = cfg.get('base_domain')
self.api_base_url = cfg.get('mailman3_api_base_url')
self.api_username = cfg.get('mailman3_api_username')
self.api_password = cfg.get('mailman3_api_password')
def subscribe(self, address: str, mailing_list: str):
if '@' in mailing_list:
mailing_list = mailing_list[:mailing_list.index('@')]
if '@' not in address:
address = f'{address}@{self.base_domain}'
url = f'{self.api_base_url}/members'
resp = requests.post(
url,
data={
'list_id': f'{mailing_list}.{self.base_domain}',
'subscriber': address,
'pre_verified': 'True',
'pre_confirmed': 'True',
'pre_approved': 'True',
},
auth=HTTPBasicAuth(self.api_username, self.api_password),
)
resp.raise_for_status()
def unsubscribe(self, address: str, mailing_list: str):
if '@' not in mailing_list:
mailing_list = f'{mailing_list}@{self.base_domain}'
if '@' not in address:
address = f'{address}@{self.base_domain}'
url = f'{self.api_base_url}/lists/{mailing_list}/member/{address}'
resp = requests.delete(
url,
data={
'pre_approved': 'True',
'pre_confirmed': 'True',
},
auth=HTTPBasicAuth(self.api_username, self.api_password),
)
resp.raise_for_status()