pyceo/tests/MockMailmanServer.py

59 lines
1.9 KiB
Python
Raw Permalink Normal View History

2021-08-13 20:11:56 -04:00
from aiohttp import web
from .MockHTTPServerBase import MockHTTPServerBase
2021-08-13 20:11:56 -04:00
class MockMailmanServer(MockHTTPServerBase):
def __init__(self, port=8001, prefix='/3.1'):
routes = [
web.post(prefix + '/members', self.subscribe),
web.delete(prefix + '/lists/{mailing_list}/member/{address}', self.unsubscribe),
]
super().__init__(port, routes)
2021-08-13 20:11:56 -04:00
2021-08-17 21:59:24 -04:00
# add more as necessary
self.subscriptions = {
'csc-general': [],
'exec': [],
'syscom': [],
'syscom-alerts': [],
}
2021-08-13 20:11:56 -04:00
def clear(self):
for key in self.subscriptions:
self.subscriptions[key].clear()
2021-08-13 20:11:56 -04:00
async def subscribe(self, request):
body = await request.post()
subscriber = body['subscriber']
2021-08-17 21:59:24 -04:00
list_id = body['list_id']
list_id = list_id[:list_id.index('.')]
if list_id not in self.subscriptions:
return web.json_response({
'description': 'No such list',
}, status=400)
subscribers = self.subscriptions[list_id]
if subscriber in subscribers:
2021-08-13 20:11:56 -04:00
return web.json_response({
'description': 'user is already subscribed',
}, status=409)
2021-08-17 21:59:24 -04:00
subscribers.append(subscriber)
2021-08-13 20:11:56 -04:00
return web.json_response({'status': 'OK'})
async def unsubscribe(self, request):
subscriber = request.match_info['address']
2021-08-17 21:59:24 -04:00
list_id = request.match_info['mailing_list']
list_id = list_id[:list_id.index('@')]
subscribers = self.subscriptions.get(list_id, [])
if subscriber not in subscribers:
2021-08-13 20:11:56 -04:00
return web.json_response({
'description': 'user is not subscribed',
}, status=404)
2021-08-17 21:59:24 -04:00
subscribers.remove(subscriber)
2021-08-13 20:11:56 -04:00
return web.json_response({'status': 'OK'})
if __name__ == '__main__':
server = MockMailmanServer()
server.start()