pyceo/tests/MockMailmanServer.py

59 lines
1.9 KiB
Python

from aiohttp import web
from .MockHTTPServerBase import MockHTTPServerBase
class MockMailmanServer(MockHTTPServerBase):
def __init__(self, port=8001, prefix='/3.1'):
routes = [
web.post(prefix + '/members', self.subscribe),
web.delete(prefix + '/lists/{mailing_list}/member/{address}', self.unsubscribe),
]
super().__init__(port, routes)
# add more as necessary
self.subscriptions = {
'csc-general': [],
'exec': [],
'syscom': [],
'syscom-alerts': [],
}
def clear(self):
for key in self.subscriptions:
self.subscriptions[key].clear()
async def subscribe(self, request):
body = await request.post()
subscriber = body['subscriber']
list_id = body['list_id']
list_id = list_id[:list_id.index('.')]
if list_id not in self.subscriptions:
return web.json_response({
'description': 'No such list',
}, status=400)
subscribers = self.subscriptions[list_id]
if subscriber in subscribers:
return web.json_response({
'description': 'user is already subscribed',
}, status=409)
subscribers.append(subscriber)
return web.json_response({'status': 'OK'})
async def unsubscribe(self, request):
subscriber = request.match_info['address']
list_id = request.match_info['mailing_list']
list_id = list_id[:list_id.index('@')]
subscribers = self.subscriptions.get(list_id, [])
if subscriber not in subscribers:
return web.json_response({
'description': 'user is not subscribed',
}, status=404)
subscribers.remove(subscriber)
return web.json_response({'status': 'OK'})
if __name__ == '__main__':
server = MockMailmanServer()
server.start()