pyceo/tests/MockCloudStackServer.py

161 lines
5.4 KiB
Python

from uuid import uuid4
from aiohttp import web
from .MockHTTPServerBase import MockHTTPServerBase
def gen_uuid():
return str(uuid4())
class MockCloudStackServer(MockHTTPServerBase):
def __init__(self, port=8080):
routes = [
web.get('/client/api', self.generic_handler),
web.post('/client/api', self.generic_handler),
# for debugging purposes
web.get('/reset', self.reset_handler),
web.post('/reset', self.reset_handler),
]
super().__init__(port, routes)
self.users_by_accountid = {}
self.users_by_username = {}
def clear(self):
self.users_by_accountid.clear()
self.users_by_username.clear()
async def reset_handler(self, request):
self.clear()
return web.Response(text='OK\n')
def _add_user(self, username: str):
account_id = gen_uuid()
user_id = gen_uuid()
user = {
"id": user_id,
"username": username,
"firstname": "Calum",
"lastname": "Dalek",
"email": username + "@csclub.internal",
"created": "2021-11-20T11:08:24-0500",
"state": "enabled",
"account": username,
"accounttype": 0,
"usersource": "ldap",
"roleid": "24422759-45de-11ec-b585-32ee6075b19b",
"roletype": "User",
"rolename": "User",
"domainid": "4d2a4a98-b1b4-47a8-ab8f-7e175013a0f0",
"domain": "Members",
"accountid": account_id,
"iscallerchilddomain": False,
"isdefault": False
}
self.users_by_accountid[account_id] = user
self.users_by_username[username] = user
return user
def _delete_user(self, account_id: str):
user = self.users_by_accountid[account_id]
username = user['username']
del self.users_by_accountid[account_id]
del self.users_by_username[username]
def _account_from_username(self, username: str):
user = self.users_by_username[username]
return {
"id": user['accountid'],
"name": username,
"accounttype": 0,
"roleid": "24422759-45de-11ec-b585-32ee6075b19b",
"roletype": "User",
"rolename": "User",
"domainid": "4d2a4a98-b1b4-47a8-ab8f-7e175013a0f0",
"domain": "Members",
"domainpath": "ROOT/Members",
"state": "enabled",
"user": [user],
"isdefault": False,
"groups": []
}
async def generic_handler(self, request):
command = request.query['command']
if command == 'listDomains':
return web.json_response({
"listdomainsresponse": {
"count": 1,
"domain": [{
"id": "4d2a4a98-b1b4-47a8-ab8f-7e175013a0f0",
"name": "Members",
"level": 1,
"parentdomainid": "f0f8263c-45dd-11ec-b585-32ee6075b19b",
"parentdomainname": "ROOT",
"haschild": False,
"path": "ROOT/Members",
"state": "Active",
"secondarystoragetotal": 0.0
}]
}
})
elif command == 'ldapCreateAccount':
username = request.query['username']
if username in self.users_by_username:
return web.json_response({
"createaccountresponse": {
"uuidList": [],
"errorcode": 530,
"cserrorcode": 4250,
"errortext": f"The user {username} already exists in domain 2"
}
}, status=530)
self._add_user(username)
return web.json_response({
"createaccountresponse": {
"account": self._account_from_username(username),
}
})
elif command == 'listUsers':
users = list(self.users_by_username.values())
return web.json_response({
'listusersresponse': {
'count': len(users),
'user': users,
}
})
elif command == 'listAccounts':
usernames = list(self.users_by_username.keys())
return web.json_response({
'listaccountsresponse': {
'count': len(usernames),
'account': [
self._account_from_username(username)
for username in usernames
]
}
})
elif command == 'deleteAccount':
account_id = request.query['id']
self._delete_user(account_id)
return web.json_response({
'deleteaccountresponse': {
'jobid': gen_uuid()
}
})
else:
return web.json_response({
"errorresponse": {
"uuidList": [],
"errorcode": 401,
"errortext": "unable to verify user credentials and/or request signature"
}
}, status=401)
if __name__ == '__main__':
server = MockCloudStackServer()
server.start()