from uuid import uuid4 from aiohttp import web from .MockHTTPServerBase import MockHTTPServerBase def gen_uuid(): return str(uuid4()) class MockCloudStackServer(MockHTTPServerBase): def __init__(self, port=8080): routes = [ web.get('/client/api', self.generic_handler), web.post('/client/api', self.generic_handler), # for debugging purposes web.get('/reset', self.reset_handler), web.post('/reset', self.reset_handler), ] super().__init__(port, routes) self.users_by_accountid = {} self.users_by_username = {} def clear(self): self.users_by_accountid.clear() self.users_by_username.clear() async def reset_handler(self, request): self.clear() return web.Response(text='OK\n') def _add_user(self, username: str): account_id = gen_uuid() user_id = gen_uuid() user = { "id": user_id, "username": username, "firstname": "Calum", "lastname": "Dalek", "email": username + "@csclub.internal", "created": "2021-11-20T11:08:24-0500", "state": "enabled", "account": username, "accounttype": 0, "usersource": "ldap", "roleid": "24422759-45de-11ec-b585-32ee6075b19b", "roletype": "User", "rolename": "User", "domainid": "4d2a4a98-b1b4-47a8-ab8f-7e175013a0f0", "domain": "Members", "accountid": account_id, "iscallerchilddomain": False, "isdefault": False } self.users_by_accountid[account_id] = user self.users_by_username[username] = user return user def _delete_user(self, account_id: str): user = self.users_by_accountid[account_id] username = user['username'] del self.users_by_accountid[account_id] del self.users_by_username[username] def _account_from_username(self, username: str): user = self.users_by_username[username] return { "id": user['accountid'], "name": username, "accounttype": 0, "roleid": "24422759-45de-11ec-b585-32ee6075b19b", "roletype": "User", "rolename": "User", "domainid": "4d2a4a98-b1b4-47a8-ab8f-7e175013a0f0", "domain": "Members", "domainpath": "ROOT/Members", "state": "enabled", "user": [user], "isdefault": False, "groups": [] } async def generic_handler(self, request): command = request.query['command'] if command == 'listDomains': return web.json_response({ "listdomainsresponse": { "count": 1, "domain": [{ "id": "4d2a4a98-b1b4-47a8-ab8f-7e175013a0f0", "name": "Members", "level": 1, "parentdomainid": "f0f8263c-45dd-11ec-b585-32ee6075b19b", "parentdomainname": "ROOT", "haschild": False, "path": "ROOT/Members", "state": "Active", "secondarystoragetotal": 0.0 }] } }) elif command == 'ldapCreateAccount': username = request.query['username'] if username in self.users_by_username: return web.json_response({ "createaccountresponse": { "uuidList": [], "errorcode": 530, "cserrorcode": 4250, "errortext": f"The user {username} already exists in domain 2" } }, status=530) self._add_user(username) return web.json_response({ "createaccountresponse": { "account": self._account_from_username(username), } }) elif command == 'listUsers': users = list(self.users_by_username.values()) return web.json_response({ 'listusersresponse': { 'count': len(users), 'user': users, } }) elif command == 'listAccounts': usernames = list(self.users_by_username.keys()) return web.json_response({ 'listaccountsresponse': { 'count': len(usernames), 'account': [ self._account_from_username(username) for username in usernames ] } }) elif command == 'deleteAccount': account_id = request.query['id'] self._delete_user(account_id) return web.json_response({ 'deleteaccountresponse': { 'jobid': gen_uuid() } }) else: return web.json_response({ "errorresponse": { "uuidList": [], "errorcode": 401, "errortext": "unable to verify user credentials and/or request signature" } }, status=401) if __name__ == '__main__': server = MockCloudStackServer() server.start()