from aiohttp import web from .MockHTTPServerBase import MockHTTPServerBase class MockHarborServer(MockHTTPServerBase): def __init__(self, port=8002): prefix = '/api/v2.0' routes = [ web.get(prefix + '/users', self.users_get_handler), web.get(prefix + '/projects', self.projects_get_handler), web.post(prefix + '/projects', self.projects_post_handler), web.post(prefix + '/projects/{project}/members', self.members_post_handler), web.get(prefix + '/projects/{project}/repositories', self.repositories_get_handler), web.delete(prefix + '/projects/{project}/repositories/{repository}', self.repositories_delete_handler), web.delete(prefix + '/projects/{project}', self.projects_delete_handler), # for debugging purposes web.post('/reset', self.reset_handler), web.delete('/users/{username}', self.users_delete_handler), ] super().__init__(port, routes) self.users = ['ctdalek', 'regular1', 'exec1'] self.projects = { 'ctdalek': ['repo1', 'repo2'], 'regular1': [], 'exec1': [], } async def projects_get_handler(self, request): return web.json_response([ {'name': name, 'project_id': i + 1} for i, name in enumerate(self.projects.keys()) ]) async def projects_delete_handler(self, request): project_name = request.match_info['project'] if project_name not in self.projects: return web.json_response({"errors": [{ "code": "FORBIDDEN", "message": "forbidden" }]}, status=403) del self.projects[project_name] return web.Response(text='', status=200) async def repositories_delete_handler(self, request): project_name = request.match_info['project'] repository_name = request.match_info['repository'] self.projects[project_name].remove(repository_name) return web.Response(text='', status=200) async def repositories_get_handler(self, request): project_name = request.match_info['project'] if project_name not in self.projects: return web.json_response({"errors": [{ "code": "FORBIDDEN", "message": "forbidden" }]}, status=403) projects = self.projects[project_name] return web.json_response([ {'id': i, 'name': project_name + '/' + name} for i, name in enumerate(projects) ]) async def users_get_handler(self, request): username = request.query['username'] if username not in self.users: return web.json_response([]) return web.json_response([{ 'username': username, 'realname': username, 'user_id': self.users.index(username), 'email': username + '@csclub.internal', }]) async def members_post_handler(self, request): await request.json() return web.Response(text='', status=201) async def projects_post_handler(self, request): body = await request.json() project_name = body['project_name'] if project_name in self.projects: return web.json_response({'errors': [{ "code": "CONFLICT", "message": f"The project named {project_name} already exists", }]}, status=409) self.projects[project_name] = ['repo1', 'repo2'] return web.Response(text='', status=201) async def users_delete_handler(self, request): username = request.match_info['username'] self.users.remove(username) return web.Response(text='OK\n', status=201) def reset(self): self.users.clear() self.projects.clear() async def reset_handler(self, request): self.reset() return web.Response(text='OK\n') if __name__ == '__main__': server = MockHarborServer() server.start()