pyceo/ceod/model/CloudStackService.py

102 lines
3.4 KiB
Python

from base64 import b64encode
import hashlib
import hmac
from typing import Dict
from urllib.parse import quote_plus
import requests
from zope import component
from zope.interface import implementer
from ceo_common.errors import CloudStackAPIError
from ceo_common.logger_factory import logger_factory
from ceo_common.interfaces import ICloudStackService, IConfig, IUser
logger = logger_factory(__name__)
@implementer(ICloudStackService)
class CloudStackService:
def __init__(self):
cfg = component.getUtility(IConfig)
self.api_key = cfg.get('cloudstack_api_key')
self.secret_key = cfg.get('cloudstack_secret_key')
self.base_url = cfg.get('cloudstack_base_url')
self.members_domain = 'Members'
self._cached_domain_id = None
def _create_url(self, params: Dict[str, str]) -> str:
# See https://docs.cloudstack.apache.org/en/4.16.0.0/developersguide/dev.html#the-cloudstack-api
params['apiKey'] = self.api_key
params['response'] = 'json'
request_str = '&'.join(
key + '=' + quote_plus(val)
for key, val in params.items()
)
sig_str = '&'.join(
key.lower() + '=' + quote_plus(val).lower().replace('+', '%20')
for key, val in sorted(params.items())
)
sig = hmac.new(self.secret_key.encode(), sig_str.encode(), hashlib.sha1).digest()
encoded_sig = b64encode(sig).decode()
url = self.base_url + '?' + request_str + '&signature=' + quote_plus(encoded_sig)
return url
def _get_domain_id(self) -> str:
if self._cached_domain_id is not None:
return self._cached_domain_id
url = self._create_url({
'command': 'listDomains',
'details': 'min',
'name': self.members_domain,
})
resp = requests.get(url)
resp.raise_for_status()
d = resp.json()['listdomainsresponse']
assert d['count'] == 1, 'there should be one domain found'
domain_id = d['domain'][0]['id']
self._cached_domain_id = domain_id
return domain_id
def get_accounts(self) -> Dict[str, str]:
domain_id = self._get_domain_id()
url = self._create_url({
'command': 'listAccounts',
'accounttype': '0', # regular user (exclude domain admin)
'domainid': domain_id,
'details': 'min',
})
resp = requests.get(url)
resp.raise_for_status()
d = resp.json()['listaccountsresponse']
if 'account' not in d:
# The API returns an empty dict if there are no accounts
return []
return {
account['name']: account['id']
for account in d['account']
}
def delete_account(self, username: str, account_id: str):
logger.info(f'Deleting CloudStack account for {username}')
url = self._create_url({
'command': 'deleteAccount',
'id': account_id,
})
resp = requests.post(url)
resp.raise_for_status()
def create_account(self, user: IUser):
domain_id = self._get_domain_id()
url = self._create_url({
'command': 'ldapCreateAccount',
'accounttype': '0',
'domainid': domain_id,
'username': user.uid,
})
resp = requests.post(url)
d = resp.json()['createaccountresponse']
if not resp.ok:
raise CloudStackAPIError(d['errortext'])