import asyncio from threading import Thread from aiohttp import web class MockMailmanServer: def __init__(self): self.app = web.Application() self.app.add_routes([ web.post('/members', self.subscribe), web.delete('/lists/{mailing_list}/member/{address}', self.unsubscribe), ]) self.runner = web.AppRunner(self.app) self.loop = asyncio.new_event_loop() self.subscriptions = [] def _start_loop(self): asyncio.set_event_loop(self.loop) self.loop.run_until_complete(self.runner.setup()) site = web.TCPSite(self.runner, 'localhost', 8002) self.loop.run_until_complete(site.start()) self.loop.run_forever() def start(self): t = Thread(target=self._start_loop) t.start() def stop(self): self.loop.call_soon_threadsafe(self.loop.stop) async def subscribe(self, request): body = await request.post() subscriber = body['subscriber'] if subscriber in self.subscriptions: return web.json_response({ 'description': 'user is already subscribed', }, status=409) self.subscriptions.append(subscriber) return web.json_response({'status': 'OK'}) async def unsubscribe(self, request): subscriber = request.match_info['address'] if subscriber not in self.subscriptions: return web.json_response({ 'description': 'user is not subscribed', }, status=404) self.subscriptions.remove(subscriber) return web.json_response({'status': 'OK'})