from base64 import b64encode import hashlib import hmac from typing import Dict from urllib.parse import quote import requests from zope import component from zope.interface import implementer from ceo_common.interfaces import ICloudService, IConfig, IUser from ceo_common.model import Term @implementer(ICloudService) class CloudService: def __init__(self): cfg = component.getUtility(IConfig) self.api_key = cfg.get('cloudstack_api_key') self.secret_key = cfg.get('cloudstack_secret_key') self.base_url = cfg.get('cloudstack_base_url') self.members_domain = cfg.get('cloudstack_members_domain') def _create_url(self, params: Dict[str, str]) -> str: # See https://docs.cloudstack.apache.org/en/latest/developersguide/dev.html#the-cloudstack-api if 'apiKey' not in params and 'apikey' not in params: params['apiKey'] = self.api_key params['response'] = 'json' request_str = '&'.join( key + '=' + quote(val) for key, val in params.items() ) sig_str = '&'.join( key.lower() + '=' + quote(val).lower() for key, val in sorted(params.items()) ) sig = hmac.new(self.secret_key.encode(), sig_str.encode(), hashlib.sha1).digest() encoded_sig = b64encode(sig).decode() url = self.base_url + '?' + request_str + '&signature=' + quote(encoded_sig) return url def _get_domain_id(self, domain_name: str) -> str: url = self._create_url({ 'command': 'listDomains', 'details': 'min', 'name': domain_name, }) resp = requests.get(url) resp.raise_for_status() d = resp.json()['listdomainsresponse'] assert d['count'] == 1, 'there should be one domain found' return d['domain'][0]['id'] def create_account(self, user: IUser): if not user.terms: raise Exception('Only members may create cloud accounts') most_recent_term = max(map(Term, user.terms)) if most_recent_term < Term.current(): raise Exception('Membership has expired for this user') domain_id = self._get_domain_id(self.members_domain) url = self._create_url({ 'command': 'ldapCreateAccount', 'accounttype': '0', 'domainid': domain_id, 'username': user.uid, }) resp = requests.post(url) d = resp.json()['createaccountresponse'] if not resp.ok: raise Exception(d['errortext'])