pyceo/ceod/model/MailmanService.py

67 lines
2.5 KiB
Python

import requests
from requests.auth import HTTPBasicAuth
from zope import component
from zope.interface import implementer
from ceo_common.errors import UserAlreadySubscribedError, UserNotSubscribedError, \
NoSuchListError
from ceo_common.interfaces import IMailmanService, IConfig
@implementer(IMailmanService)
class MailmanService:
def __init__(self):
cfg = component.getUtility(IConfig)
self.base_domain = cfg.get('base_domain')
self.api_base_url = cfg.get('mailman3_api_base_url').rstrip('/')
api_username = cfg.get('mailman3_api_username')
api_password = cfg.get('mailman3_api_password')
self.basic_auth = HTTPBasicAuth(api_username, api_password)
def subscribe(self, address: str, mailing_list: str):
if '@' in mailing_list:
mailing_list = mailing_list[:mailing_list.index('@')]
if '@' not in address:
address = f'{address}@{self.base_domain}'
url = f'{self.api_base_url}/members'
resp = requests.post(
url,
data={
'list_id': f'{mailing_list}.{self.base_domain}',
'subscriber': address,
'pre_verified': 'True',
'pre_confirmed': 'True',
'pre_approved': 'True',
},
auth=self.basic_auth,
)
if not resp.ok:
desc = resp.json().get('description')
if resp.status_code == 409:
raise UserAlreadySubscribedError()
elif resp.status_code == 400 and desc == 'No such list':
raise NoSuchListError()
raise Exception(desc)
def unsubscribe(self, address: str, mailing_list: str):
if '@' not in mailing_list:
mailing_list = f'{mailing_list}@{self.base_domain}'
if '@' not in address:
address = f'{address}@{self.base_domain}'
url = f'{self.api_base_url}/lists/{mailing_list}/member/{address}'
resp = requests.delete(
url,
data={
'pre_approved': 'True',
'pre_confirmed': 'True',
},
auth=self.basic_auth,
)
if not resp.ok:
desc = resp.json().get('description')
if resp.status_code == 404:
# Unfortunately, a 404 here could mean either the list doesn't
# exist, or the member isn't subscribed
raise UserNotSubscribedError()
raise Exception(desc)