pyceo/tests/MockHarborServer.py

105 lines
3.9 KiB
Python
Raw Normal View History

from aiohttp import web
from .MockHTTPServerBase import MockHTTPServerBase
class MockHarborServer(MockHTTPServerBase):
def __init__(self, port=8002):
prefix = '/api/v2.0'
routes = [
web.get(prefix + '/users', self.users_get_handler),
web.get(prefix + '/projects', self.projects_get_handler),
web.post(prefix + '/projects', self.projects_post_handler),
web.post(prefix + '/projects/{project}/members', self.members_post_handler),
web.get(prefix + '/projects/{project}/repositories', self.repositories_get_handler),
web.delete(prefix + '/projects/{project}/repositories/{repository}', self.repositories_delete_handler),
web.delete(prefix + '/projects/{project}', self.projects_delete_handler),
# for debugging purposes
web.post('/reset', self.reset_handler),
web.delete('/users/{username}', self.users_delete_handler),
]
super().__init__(port, routes)
self.users = ['ctdalek', 'regular1', 'exec1']
self.projects = {
'ctdalek': ['repo1', 'repo2'],
'regular1': [],
'exec1': [],
}
async def projects_get_handler(self, request):
return web.json_response([
{'name': name, 'project_id': i + 1}
for i, name in enumerate(self.projects.keys())
])
async def projects_delete_handler(self, request):
project_name = request.match_info['project']
if project_name not in self.projects:
return web.json_response({"errors": [{
"code": "FORBIDDEN", "message": "forbidden"
}]}, status=403)
del self.projects[project_name]
return web.Response(text='', status=200)
async def repositories_delete_handler(self, request):
project_name = request.match_info['project']
repository_name = request.match_info['repository']
self.projects[project_name].remove(repository_name)
return web.Response(text='', status=200)
async def repositories_get_handler(self, request):
project_name = request.match_info['project']
if project_name not in self.projects:
return web.json_response({"errors": [{
"code": "FORBIDDEN", "message": "forbidden"
}]}, status=403)
projects = self.projects[project_name]
return web.json_response([
{'id': i, 'name': name} for i, name in enumerate(projects)
])
async def users_get_handler(self, request):
username = request.query['username']
if username not in self.users:
return web.json_response([])
return web.json_response([{
'username': username,
'realname': username,
'user_id': self.users.index(username),
'email': username + '@csclub.internal',
}])
async def members_post_handler(self, request):
await request.json()
return web.Response(text='', status=201)
async def projects_post_handler(self, request):
body = await request.json()
project_name = body['project_name']
if project_name in self.projects:
return web.json_response({'errors': [{
"code": "CONFLICT",
"message": f"The project named {project_name} already exists",
}]}, status=409)
self.projects[project_name] = ['repo1', 'repo2']
return web.Response(text='', status=201)
async def users_delete_handler(self, request):
username = request.match_info['username']
self.users.remove(username)
return web.Response(text='OK\n', status=201)
def reset(self):
self.users.clear()
self.projects.clear()
async def reset_handler(self, request):
self.reset()
return web.Response(text='OK\n')
if __name__ == '__main__':
server = MockHarborServer()
server.start()