pyceo/ceod/model/CloudStackService.py

101 lines
3.3 KiB
Python

from base64 import b64encode
import hashlib
import hmac
from typing import Dict
from urllib.parse import quote
import requests
from zope import component
from zope.interface import implementer
from ceo_common.errors import CloudStackAPIError
from ceo_common.logger_factory import logger_factory
from ceo_common.interfaces import ICloudStackService, IConfig, IUser
logger = logger_factory(__name__)
@implementer(ICloudStackService)
class CloudStackService:
def __init__(self):
cfg = component.getUtility(IConfig)
self.api_key = cfg.get('cloudstack_api_key')
self.secret_key = cfg.get('cloudstack_secret_key')
self.base_url = cfg.get('cloudstack_base_url')
self.members_domain = 'Members'
self._cached_domain_id = None
def _create_url(self, params: Dict[str, str]) -> str:
# See https://docs.cloudstack.apache.org/en/latest/developersguide/dev.html#the-cloudstack-api
if 'apiKey' not in params and 'apikey' not in params:
params['apiKey'] = self.api_key
params['response'] = 'json'
request_str = '&'.join(
key + '=' + quote(val)
for key, val in params.items()
)
sig_str = '&'.join(
key.lower() + '=' + quote(val).lower()
for key, val in sorted(params.items())
)
sig = hmac.new(self.secret_key.encode(), sig_str.encode(), hashlib.sha1).digest()
encoded_sig = b64encode(sig).decode()
url = self.base_url + '?' + request_str + '&signature=' + quote(encoded_sig)
return url
def _get_domain_id(self) -> str:
if self._cached_domain_id is not None:
return self._cached_domain_id
url = self._create_url({
'command': 'listDomains',
'details': 'min',
'name': self.members_domain,
})
resp = requests.get(url)
resp.raise_for_status()
d = resp.json()['listdomainsresponse']
assert d['count'] == 1, 'there should be one domain found'
domain_id = d['domain'][0]['id']
self._cached_domain_id = domain_id
return domain_id
def get_accounts(self) -> Dict[str, str]:
domain_id = self._get_domain_id()
url = self._create_url({
'command': 'listAccounts',
'domainid': domain_id,
'details': 'min',
})
resp = requests.get(url)
resp.raise_for_status()
d = resp.json()['listaccountsresponse']
if 'account' not in d:
# The API returns an empty dict if there are no accounts
return []
return {
account['name']: account['id']
for account in d['account']
}
def delete_account(self, account_id: str):
url = self._create_url({
'command': 'deleteAccount',
'id': account_id,
})
resp = requests.post(url)
resp.raise_for_status()
def create_account(self, user: IUser):
domain_id = self._get_domain_id()
url = self._create_url({
'command': 'ldapCreateAccount',
'accounttype': '0',
'domainid': domain_id,
'username': user.uid,
})
resp = requests.post(url)
d = resp.json()['createaccountresponse']
if not resp.ok:
raise CloudStackAPIError(d['errortext'])