Add Kubernetes API endpoint (#38)

Add an API for members to create their own Kubernetes namespace.

Co-authored-by: Max Erenberg <>
Reviewed-on: #38
Co-authored-by: Max Erenberg <merenber@csclub.uwaterloo.ca>
Co-committed-by: Max Erenberg <merenber@csclub.uwaterloo.ca>
pull/39/head
Max Erenberg 9 months ago
parent b4110d887d
commit f08f4872cf
  1. 8
      .drone/common.sh
  2. 19
      .drone/k8s-authority.crt
  3. 19
      .drone/mock_kubectl
  4. 1
      ceo/cli/cloud.py
  5. 2
      ceo/cli/entrypoint.py
  6. 42
      ceo/cli/k8s.py
  7. 16
      ceo_common/interfaces/ICloudResourceManager.py
  8. 42
      ceo_common/interfaces/ICloudService.py
  9. 26
      ceo_common/interfaces/ICloudStackService.py
  10. 24
      ceo_common/interfaces/IKubernetesService.py
  11. 19
      ceo_common/interfaces/IVHostManager.py
  12. 4
      ceo_common/interfaces/__init__.py
  13. 24
      ceod/api/app_factory.py
  14. 35
      ceod/api/cloud.py
  15. 111
      ceod/model/CloudResourceManager.py
  16. 236
      ceod/model/CloudService.py
  17. 100
      ceod/model/CloudStackService.py
  18. 116
      ceod/model/KubernetesService.py
  19. 1
      ceod/model/MailService.py
  20. 98
      ceod/model/VHostManager.py
  21. 4
      ceod/model/__init__.py
  22. 20
      ceod/model/templates/kubeconfig.j2
  23. 10
      ceod/model/templates/kubernetes_csr.yaml.j2
  24. 60
      ceod/model/templates/kubernetes_user.yaml.j2
  25. 5
      ceod/model/templates/nginx_cloud_vhost_config.j2
  26. 60
      ceod/model/utils.py
  27. 4
      debian/control
  28. 60
      docs/ceod.ini.5.scd
  29. 24
      docs/openapi.yaml
  30. 15
      docs/redoc-static.html
  31. 11
      etc/ceod.ini
  32. 1
      etc/default/ceod
  33. 26
      tests/ceo/cli/test_cloud.py
  34. 23
      tests/ceod/api/test_cloud.py
  35. 27
      tests/ceod/model/test_k8s.py
  36. 61
      tests/ceod/model/test_vhosts.py
  37. 11
      tests/ceod_dev.ini
  38. 9
      tests/ceod_test_local.ini
  39. 39
      tests/conftest.py

@ -11,6 +11,14 @@ ln -s /bin/true /usr/local/bin/systemctl
# mock out acme.sh
mkdir -p /root/.acme.sh
ln -s /bin/true /root/.acme.sh/acme.sh
# mock out kubectl
cp .drone/mock_kubectl /usr/local/bin/kubectl
chmod +x /usr/local/bin/kubectl
# add k8s authority certificate
mkdir -p /etc/csc
cp .drone/k8s-authority.crt /etc/csc/k8s-authority.crt
# openssl is actually already present in the python Docker image,
# so we don't need to mock it out
get_ip_addr() {
getent hosts $1 | cut -d' ' -f1

@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIC/jCCAeagAwIBAgIBADANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwprdWJl
cm5ldGVzMB4XDTIxMTIwNDIxNDcxOVoXDTMxMTIwMjIxNDcxOVowFTETMBEGA1UE
AxMKa3ViZXJuZXRlczCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAN50
H4RcrV5ZDDqT5XMfN1ml8MalyMDAG8mE+lNT1rsUGBUp2jhNfG0OpFUm55yGarI9
2BrNGXLyFGm3yy6MWJorSUqaSBzt9+JHtBDVQwCgTX9PYSX1X/kFNQFLZkNrMtO4
417WELlkl9miCWWmTPOZAMYZWbnRKrndd3MsrhOcuDwqT5rX+LLl6VktWx5+qmuc
49sd3fWJ1MxLZ+Q6/Eo5jPuPVOPl8wLcwf9MD0rgRMVU+XycwDKr/3vmBbs22hiw
PcWIPHugAy4PRbiWfHOymO+c4WSCCS7nre3mIAyXuT0EEPDnEnrkbYoSuwIJ0tLp
N8/6vaLbBfO5ckAU2VUCAwEAAaNZMFcwDgYDVR0PAQH/BAQDAgKkMA8GA1UdEwEB
/wQFMAMBAf8wHQYDVR0OBBYEFNqlikMIHwY+A1/PHzwPB0CtSLX+MBUGA1UdEQQO
MAyCCmt1YmVybmV0ZXMwDQYJKoZIhvcNAQELBQADggEBAJ2j87US8VTVTFoayNSk
mzip60VzgKxawi/lP1F0/JqCHtdcaA/JmlN8FggzaSxS6AA/gxNTriLNLedhqgNF
f5F5Lq0bQAebzbijsEMr+wGE6zYBgg2L0u55jqSSU1Quhay83eCD0b0O3XHGdzg0
29jC+r8pOYWuwCBaIU8NN8EouHbQ25jqJAPLCIjuqPSEPfxjZla9f2ZO7Zpx+Yud
jDYHz9ZwBYmeR7Z74/oStJ+eIFfwlJKIQL0QFzKgw2KUHmmzHVxpx60rajiGNAb8
7FNPWTjIYX11Hy56jZAUirfwCak1IxfI8O0/X1LzVPCs7uaE1SG8TCsJgjrD2Nwm
2w4=
-----END CERTIFICATE-----

@ -0,0 +1,19 @@
#!/bin/bash
if [ "$1" = apply ]; then
exit
elif [ "$1" = delete ]; then
exit
elif [ "$1" = certificate ]; then
exit
elif [ "$1" = get ]; then
if [ "$2" = csr -a "$4" = "-o" -a "$5" = 'jsonpath={.status.certificate}' ]; then
echo -n 'LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUMxekNDQWI4Q0ZHeVk0ZVpVMnAvTjMzU0pCTlptMm1vSlE5TXFNQTBHQ1NxR1NJYjNEUUVCQ3dVQU1DZ3gKRURBT0JnTlZCQU1NQjJOMFpHRnNaV3N4RkRBU0JnTlZCQW9NQzJOell5MXRaVzFpWlhKek1CNFhEVEl4TVRJeApNekEwTWpJek4xb1hEVEl5TURFeE1qQTBNakl6TjFvd0tERVFNQTRHQTFVRUF3d0hZM1JrWVd4bGF6RVVNQklHCkExVUVDZ3dMWTNOakxXMWxiV0psY25Nd2dnRWlNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJEd0F3Z2dFS0FvSUIKQVFEV09vaTd6ejE0c3VBZ0V2QkgrSHFHSzlCUUlQTm5QQ0llVkxXenlFRTNxUWZRV2YvcWNzeGNST2pSKzVCTgpKSXBaQlNZdjRmNE52WFZqaHlQendoWUd0bXJRYksyT3RCTDlqMDJMWjhMVHp2TnE0MW9CYVdXUFhhaVdIVys2CjkzQnlBdXFPMmdnSEt0elNkV09TcTZpeFBXMVNGUzJRMkFWaXdZUEg3b1pQYnZacUZvMzdhbVdwd1pWUHVuVi8KV2tFRUttNUVqV05DSVUzVWpPdS9HeEJOT1g0WEpqWld4bFcwQUVROVp3K2ZSazBkdU5ScVVyUDQxbDZvcG4rKwpLRkE5NFg2NUlzcUMvMlJ4OWgrNkZFRHhIcjJPcjhOcGFuMXRjZEZHQlFyMGMxV1JxRkNHTytIM0VTeUNya1BjCmdnRDlVN3c0TmdGYkQyaVU0QXc3ZkhwakFnTUJBQUV3RFFZSktvWklodmNOQVFFTEJRQURnZ0VCQUY3VWUwc3YKcFhSUzN1TFl1Y0k3UkRNRGpOZnFpZ0R3NnorbzZxVmxTdGpZTGpDNjFXRyswZ0g4TDJIbm5jZVYyelhjNDkrQQp6TjFna0lWT3JlRUQvRitKbGRPUGgySUpOY1pGYTBsckFFV0dNNWRRR3pDSUM0cEtmSGxOMTZ0c0w2bGdqWTYzCmUvZlhMTFdLdktDR2lRMUlBUTh4KzYyaTVvSmU3aDBlQ1Q0aEEyM0JTRnRNelo2aEdGUURNNGxxaWhHQjEyT2UKZE5yYStsNVdLemNFR21aVFBYTXNudEZVVndPejhaNld2eGo0UW1zL1dQUElKWDdLM2NiRUo4L1RQWG1tUzJrQwowNUtueUxVQzltYnR2TGZoYldhbFZVVlJVUkYwT1RaVk5mNkt6MDJWYlRqQjRJQXdyWGZKZC9lMkMvNFpGWlJTCjVWMnlJSnBJeVJGWTdQST0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo='
exit
elif [ "$2" = namespaces ]; then
echo '{"items":[{"metadata":{"name":"default"}}]}'
exit
fi
fi
echo 'Unrecognized command'
exit 1

@ -58,6 +58,7 @@ def add_vhost(domain, ip_address):
body = {'ip_address': ip_address}
if '/' in domain:
raise Abort('invalid domain name')
click.echo('Please wait, this may take a while...')
resp = http_put('/api/cloud/vhosts/' + domain, json=body)
handle_sync_response(resp)
click.echo('Done.')

@ -8,6 +8,7 @@ from .mysql import mysql
from .postgresql import postgresql
from .mailman import mailman
from .cloud import cloud
from .k8s import k8s
@click.group()
@ -23,3 +24,4 @@ cli.add_command(mysql)
cli.add_command(postgresql)
cli.add_command(mailman)
cli.add_command(cloud)
cli.add_command(k8s)

@ -0,0 +1,42 @@
import os
import traceback
import click
from ..utils import http_post
from .utils import handle_sync_response
@click.group(short_help='Manage your CSC Kubernetes resources')
def k8s():
pass
@k8s.group(short_help='Manage your CSC Kubernetes account')
def account():
pass
@account.command(short_help='Obtain a kubeconfig')
def activate():
kubedir = os.path.join(os.environ['HOME'], '.kube')
if not os.path.isdir(kubedir):
os.mkdir(kubedir)
kubeconfig = os.path.join(kubedir, 'config')
resp = http_post('/api/cloud/k8s/accounts/create')
result = handle_sync_response(resp)
try:
if os.path.isfile(kubeconfig):
kubeconfig_bak = os.path.join(kubedir, 'config.bak')
os.rename(kubeconfig, kubeconfig_bak)
with open(kubeconfig, 'w') as fo:
fo.write(result['kubeconfig'])
except Exception:
click.echo(traceback.format_exc())
click.echo("We weren't able to write the kubeconfig file, so here it is.")
click.echo("Make sure to paste this into your ~/.kube/config.")
click.echo()
click.echo(result['kubeconfig'])
return
click.echo("Congratulations! You have a new kubeconfig in ~/.kube/config.")
click.echo("Run `kubectl cluster-info` to make sure everything is working.")

@ -0,0 +1,16 @@
from typing import Dict
from zope.interface import Interface
class ICloudResourceManager(Interface):
"""Manages multiple cloud resources for each user."""
def purge_accounts() -> Dict:
"""
Delete cloud resources for expired CSC accounts.
A warning message will be emailed to users one week before their
cloud account is deleted.
Another message will be emailed to the users after their cloud account
has been deleted.
"""

@ -1,42 +0,0 @@
from typing import Dict, List
from zope.interface import Interface
from .IUser import IUser
class ICloudService(Interface):
"""Performs operations on the CSC Cloud."""
def create_account(user: IUser):
"""
Activate an LDAP account in CloudStack for the given user.
"""
def purge_accounts() -> Dict:
"""
Delete CloudStack accounts which correspond to expired CSC accounts.
A warning message will be emailed to users one week before their
cloud account is deleted.
Another message will be emailed to the users after their cloud account
has been deleted.
"""
def create_vhost(username: str, domain: str, ip_address: str):
"""
Create a new vhost record for the given domain and IP address.
"""
def delete_vhost(username: str, domain: str):
"""
Delete the vhost record for the given user and domain.
"""
def get_vhosts(username: str) -> List[Dict]:
"""
Get the vhost records for the given user. Each record has the form
{
"domain": "app.username.m.csclub.cloud",
"ip_address": "172.19.134.12"
}
"""

@ -0,0 +1,26 @@
from typing import Dict
from zope.interface import Interface
from .IUser import IUser
class ICloudStackService(Interface):
"""Provides a way to interact with a CloudStack management server."""
def create_account(user: IUser):
"""
Activate an LDAP account in CloudStack for the given user.
"""
def get_accounts() -> Dict[str, str]:
"""
Get the users who have active CloudStack accounts.
The dict is mapping of usernames to account IDs.
"""
def delete_account(account_id: str):
"""
Delete the given CloudStack account.
Note that a CloudStack account ID must be given, not a username.
"""

@ -0,0 +1,24 @@
from typing import List
from zope.interface import Interface
class IKubernetesService(Interface):
"""A client for the syscom-managed k8s cluster on CloudStack."""
def create_account(username: str) -> str:
"""
Create a new k8s namespace for the given user and create new
k8s credentials for them.
The contents of a kubeconfig file are returned.
"""
def delete_account(username: str) -> str:
"""
Delete the k8s namespace for the given user.
"""
def get_accounts() -> List[str]:
"""
Get a list of users who have k8s namespaces.
"""

@ -30,7 +30,24 @@ class IVHostManager(Interface):
"""
Get the vhost records for the given user. Each record has the form
{
"domain": "app.username.m.csclub.cloud",
"domain": "app.username.csclub.cloud",
"ip_address": "172.19.134.12"
}
"""
def is_valid_domain(username: str, domain: str) -> bool:
"""
Returns true iff the user with the given username is allowed
to create a vhost for the given domain.
"""
def is_valid_ip_address(ip_address: str) -> bool:
"""
Returns true iff ip_address is a valid proxy_pass target
in NGINX and falls within the acceptable IP range.
"""
def get_accounts() -> List[str]:
"""
Get a list of users who have at least one vhost record.
"""

@ -1,4 +1,5 @@
from .ICloudService import ICloudService
from .ICloudResourceManager import ICloudResourceManager
from .ICloudStackService import ICloudStackService
from .IKerberosService import IKerberosService
from .IConfig import IConfig
from .IUser import IUser
@ -11,3 +12,4 @@ from .IMailmanService import IMailmanService
from .IHTTPClient import IHTTPClient
from .IDatabaseService import IDatabaseService
from .IVHostManager import IVHostManager
from .IKubernetesService import IKubernetesService

@ -8,11 +8,12 @@ from zope import component
from .error_handlers import register_error_handlers
from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, IFileService, \
IMailmanService, IMailService, IUWLDAPService, IHTTPClient, IDatabaseService, \
ICloudService
ICloudStackService, ICloudResourceManager, IKubernetesService, IVHostManager
from ceo_common.model import Config, HTTPClient, RemoteMailmanService
from ceod.api.spnego import init_spnego
from ceod.model import KerberosService, LDAPService, FileService, \
MailmanService, MailService, UWLDAPService, CloudService
MailmanService, MailService, UWLDAPService, CloudStackService, \
CloudResourceManager, KubernetesService, VHostManager
from ceod.db import MySQLService, PostgreSQLService
@ -114,17 +115,24 @@ def register_services(app):
uwldap_srv = UWLDAPService()
component.provideUtility(uwldap_srv, IUWLDAPService)
# MySQLService
# MySQLService, PostgreSQLService
if hostname == cfg.get('ceod_database_host'):
mysql_srv = MySQLService()
component.provideUtility(mysql_srv, IDatabaseService, 'mysql')
# PostgreSQLService
if hostname == cfg.get('ceod_database_host'):
psql_srv = PostgreSQLService()
component.provideUtility(psql_srv, IDatabaseService, 'postgresql')
# CloudService
# CloudStackService, CloudResourceManager, VHostManager, KubernetesService
if hostname == cfg.get('ceod_cloud_host'):
cloud_srv = CloudService()
component.provideUtility(cloud_srv, ICloudService)
cloudstack_srv = CloudStackService()
component.provideUtility(cloudstack_srv, ICloudStackService)
cloud_mgr = CloudResourceManager()
component.provideUtility(cloud_mgr, ICloudResourceManager)
vhost_mgr = VHostManager()
component.provideUtility(vhost_mgr, IVHostManager)
k8s_srv = KubernetesService()
component.provideUtility(k8s_srv, IKubernetesService)

@ -3,7 +3,8 @@ from zope import component
from .utils import requires_authentication_no_realm, authz_restrict_to_syscom, \
get_valid_member_or_throw
from ceo_common.interfaces import ICloudService
from ceo_common.interfaces import ICloudStackService, IVHostManager, \
IKubernetesService, ICloudResourceManager
bp = Blueprint('cloud', __name__)
@ -12,40 +13,52 @@ bp = Blueprint('cloud', __name__)
@requires_authentication_no_realm
def create_account(auth_user: str):
user = get_valid_member_or_throw(auth_user)
cloud_srv = component.getUtility(ICloudService)
cloud_srv.create_account(user)
cloudstack_srv = component.getUtility(ICloudStackService)
cloudstack_srv.create_account(user)
return {'status': 'OK'}
@bp.route('/accounts/purge', methods=['POST'])
@authz_restrict_to_syscom
def purge_accounts():
cloud_srv = component.getUtility(ICloudService)
return cloud_srv.purge_accounts()
cloud_mgr = component.getUtility(ICloudResourceManager)
return cloud_mgr.purge_accounts()
@bp.route('/vhosts/<domain>', methods=['PUT'])
@requires_authentication_no_realm
def create_vhost(auth_user: str, domain: str):
get_valid_member_or_throw(auth_user)
cloud_srv = component.getUtility(ICloudService)
vhost_mgr = component.getUtility(IVHostManager)
body = request.get_json(force=True)
ip_address = body['ip_address']
cloud_srv.create_vhost(auth_user, domain, ip_address)
vhost_mgr.create_vhost(auth_user, domain, ip_address)
return {'status': 'OK'}
@bp.route('/vhosts/<domain>', methods=['DELETE'])
@requires_authentication_no_realm
def delete_vhost(auth_user: str, domain: str):
cloud_srv = component.getUtility(ICloudService)
cloud_srv.delete_vhost(auth_user, domain)
vhost_mgr = component.getUtility(IVHostManager)
vhost_mgr.delete_vhost(auth_user, domain)
return {'status': 'OK'}
@bp.route('/vhosts', methods=['GET'])
@requires_authentication_no_realm
def get_vhosts(auth_user: str):
cloud_srv = component.getUtility(ICloudService)
vhosts = cloud_srv.get_vhosts(auth_user)
vhost_mgr = component.getUtility(IVHostManager)
vhosts = vhost_mgr.get_vhosts(auth_user)
return {'vhosts': vhosts}
@bp.route('/k8s/accounts/create', methods=['POST'])
@requires_authentication_no_realm
def create_k8s_account(auth_user: str):
get_valid_member_or_throw(auth_user)
k8s_srv = component.getUtility(IKubernetesService)
kubeconfig = k8s_srv.create_account(auth_user)
return {
'status': 'OK',
'kubeconfig': kubeconfig,
}

@ -0,0 +1,111 @@
from collections import defaultdict
import datetime
import json
import os
from typing import Dict
from zope import component
from zope.interface import implementer
from ceo_common.logger_factory import logger_factory
from ceo_common.interfaces import ICloudResourceManager, \
ILDAPService, IMailService, IKubernetesService, IVHostManager, \
ICloudStackService
from ceo_common.model import Term
import ceo_common.utils as utils
logger = logger_factory(__name__)
@implementer(ICloudResourceManager)
class CloudResourceManager:
def __init__(self):
state_dir = '/run/ceod'
if not os.path.isdir(state_dir):
os.mkdir(state_dir)
self.pending_deletions_file = \
os.path.join(state_dir, 'pending_account_deletions.json')
def purge_accounts(self) -> Dict:
accounts_deleted = []
accounts_to_be_deleted = []
result = {
'accounts_deleted': accounts_deleted,
'accounts_to_be_deleted': accounts_to_be_deleted,
}
current_term = Term.current()
beginning_of_term = current_term.to_datetime()
now = utils.get_current_datetime()
delta = now - beginning_of_term
if delta.days < 30:
# one-month grace period
return result
ldap_srv = component.getUtility(ILDAPService)
mail_srv = component.getUtility(IMailService)
k8s_srv = component.getUtility(IKubernetesService)
vhost_mgr = component.getUtility(IVHostManager)
cloudstack_srv = component.getUtility(ICloudStackService)
# get a list of all cloud services each user is using
accounts = defaultdict(list)
cloudstack_accounts = cloudstack_srv.get_accounts()
# note that cloudstack_accounts is a dict, not a list
for username in cloudstack_accounts:
accounts[username].append('cloudstack')
vhost_accounts = vhost_mgr.get_accounts()
for username in vhost_accounts:
accounts[username].append('vhost')
k8s_accounts = k8s_srv.get_accounts()
for username in k8s_accounts:
accounts[username].append('k8s')
if os.path.isfile(self.pending_deletions_file):
state = json.load(open(self.pending_deletions_file))
last_check = datetime.datetime.fromtimestamp(state['timestamp'])
delta = now - last_check
if delta.days < 7:
logger.debug(
'Skipping account purge because less than one week has '
'passed since the warning emails were sent out'
)
accounts_to_be_deleted.extend(state['accounts_to_be_deleted'])
return result
for username in state['accounts_to_be_deleted']:
if username not in accounts:
continue
user = ldap_srv.get_user(username)
if user.membership_is_valid():
continue
services = accounts[username]
if 'cloudstack' in services:
account_id = cloudstack_accounts[username]
cloudstack_srv.delete_account(account_id)
if 'vhost' in services:
vhost_mgr.delete_all_vhosts_for_user(username)
if 'k8s' in services:
k8s_srv.delete_account(username)
accounts_deleted.append(username)
mail_srv.send_cloud_account_has_been_deleted_message(user)
logger.info(f'Deleted cloud resources for {username}')
os.unlink(self.pending_deletions_file)
return result
state = {
'timestamp': int(now.timestamp()),
'accounts_to_be_deleted': accounts_to_be_deleted,
}
for username in accounts:
user = ldap_srv.get_user(username)
if user.membership_is_valid():
continue
accounts_to_be_deleted.append(username)
mail_srv.send_cloud_account_will_be_deleted_message(user)
logger.info(
f'A warning email was sent to {username} because their '
'cloud account will be deleted'
)
if accounts_to_be_deleted:
json.dump(state, open(self.pending_deletions_file, 'w'))
return result

@ -1,236 +0,0 @@
from base64 import b64encode
import datetime
import hashlib
import hmac
import ipaddress
import json
import os
import re
from typing import Dict, List
from urllib.parse import quote
import requests
from zope import component
from zope.interface import implementer
from .VHostManager import VHostManager
from ceo_common.errors import CloudStackAPIError, InvalidDomainError, \
InvalidIPError, RateLimitError
from ceo_common.logger_factory import logger_factory
from ceo_common.interfaces import ICloudService, IConfig, IUser, ILDAPService, \
IMailService
from ceo_common.model import Term
import ceo_common.utils as utils
logger = logger_factory(__name__)
@implementer(ICloudService)
class CloudService:
VALID_DOMAIN_RE = re.compile(r'^(?:[0-9a-z-]+\.)+[a-z]+$')
def __init__(self):
cfg = component.getUtility(IConfig)
self.api_key = cfg.get('cloudstack_api_key')
self.secret_key = cfg.get('cloudstack_secret_key')
self.base_url = cfg.get('cloudstack_base_url')
self.members_domain = 'Members'
self.vhost_mgr = VHostManager()
self.max_vhosts_per_account = cfg.get('cloud vhosts_max_vhosts_per_account')
self.vhost_rate_limit_secs = cfg.get('cloud vhosts_rate_limit_seconds')
self.vhost_domain = cfg.get('cloud vhosts_members_domain')
self.vhost_ip_min = ipaddress.ip_address(cfg.get('cloud vhosts_ip_range_min'))
self.vhost_ip_max = ipaddress.ip_address(cfg.get('cloud vhosts_ip_range_max'))
state_dir = '/run/ceod'
if not os.path.isdir(state_dir):
os.mkdir(state_dir)
self.pending_deletions_file = os.path.join(state_dir, 'cloudstack_pending_account_deletions.json')
self.vhost_rate_limit_file = os.path.join(state_dir, 'vhost_rate_limits.json')
def _create_url(self, params: Dict[str, str]) -> str:
# See https://docs.cloudstack.apache.org/en/latest/developersguide/dev.html#the-cloudstack-api
if 'apiKey' not in params and 'apikey' not in params:
params['apiKey'] = self.api_key
params['response'] = 'json'
request_str = '&'.join(
key + '=' + quote(val)
for key, val in params.items()
)
sig_str = '&'.join(
key.lower() + '=' + quote(val).lower()
for key, val in sorted(params.items())
)
sig = hmac.new(self.secret_key.encode(), sig_str.encode(), hashlib.sha1).digest()
encoded_sig = b64encode(sig).decode()
url = self.base_url + '?' + request_str + '&signature=' + quote(encoded_sig)
return url
def _get_domain_id(self, domain_name: str) -> str:
url = self._create_url({
'command': 'listDomains',
'details': 'min',
'name': domain_name,
})
resp = requests.get(url)
resp.raise_for_status()
d = resp.json()['listdomainsresponse']
assert d['count'] == 1, 'there should be one domain found'
return d['domain'][0]['id']
def _get_all_accounts(self, domain_id: str) -> List[Dict]:
url = self._create_url({
'command': 'listAccounts',
'domainid': domain_id,
'details': 'min',
})
resp = requests.get(url)
resp.raise_for_status()
d = resp.json()['listaccountsresponse']
if 'account' not in d:
# The API returns an empty dict if there are no accounts
return []
return d['account']
def _delete_account(self, account_id: str):
url = self._create_url({
'command': 'deleteAccount',
'id': account_id,
})
resp = requests.post(url)
resp.raise_for_status()
def create_account(self, user: IUser):
domain_id = self._get_domain_id(self.members_domain)
url = self._create_url({
'command': 'ldapCreateAccount',
'accounttype': '0',
'domainid': domain_id,
'username': user.uid,
})
resp = requests.post(url)
d = resp.json()['createaccountresponse']
if not resp.ok:
raise CloudStackAPIError(d['errortext'])
def purge_accounts(self) -> Dict:
accounts_deleted = []
accounts_to_be_deleted = []
result = {
'accounts_deleted': accounts_deleted,
'accounts_to_be_deleted': accounts_to_be_deleted,
}
current_term = Term.current()
beginning_of_term = current_term.to_datetime()
now = utils.get_current_datetime()
delta = now - beginning_of_term
if delta.days < 30:
# one-month grace period
return result
ldap_srv = component.getUtility(ILDAPService)
mail_srv = component.getUtility(IMailService)
domain_id = self._get_domain_id(self.members_domain)
accounts = self._get_all_accounts(domain_id)
if os.path.isfile(self.pending_deletions_file):
state = json.load(open(self.pending_deletions_file))
last_check = datetime.datetime.fromtimestamp(state['timestamp'])
delta = now - last_check
if delta.days < 7:
logger.debug(
'Skipping account purge because less than one week has '
'passed since the warning emails were sent out'
)
accounts_to_be_deleted.extend(state['accounts_to_be_deleted'])
return result
username_to_account_id = {
account['name']: account['id']
for account in accounts
}
for username in state['accounts_to_be_deleted']:
if username not in username_to_account_id:
continue
user = ldap_srv.get_user(username)
if user.membership_is_valid():
continue
account_id = username_to_account_id[username]
self._delete_account(account_id)
self.vhost_mgr.delete_all_vhosts_for_user(username)
accounts_deleted.append(username)
mail_srv.send_cloud_account_has_been_deleted_message(user)
logger.info(f'Deleted cloud account for {username}')
os.unlink(self.pending_deletions_file)
return result
state = {
'timestamp': int(now.timestamp()),
'accounts_to_be_deleted': accounts_to_be_deleted,
}
for account in accounts:
username = account['name']
account_id = account['id']
user = ldap_srv.get_user(username)
if user.membership_is_valid():
continue
accounts_to_be_deleted.append(username)
mail_srv.send_cloud_account_will_be_deleted_message(user)
logger.info(
f'A warning email was sent to {username} because their '
'cloud account will be deleted'
)
if accounts_to_be_deleted:
json.dump(state, open(self.pending_deletions_file, 'w'))
return result
def _is_valid_domain(self, username: str, domain: str) -> bool:
subdomain = username + '.' + self.vhost_domain
if not (domain == subdomain or domain.endswith('.' + subdomain)):
return False
if self.VALID_DOMAIN_RE.match(domain) is None:
return False
if len(domain) > 80:
return False
return True
def _is_valid_ip_address(self, ip_address: str) -> bool:
try:
addr = ipaddress.ip_address(ip_address)
except ValueError:
return False
return self.vhost_ip_min <= addr <= self.vhost_ip_max
def _check_rate_limit(self, username: str):
if os.path.exists(self.vhost_rate_limit_file):
d = json.load(open(self.vhost_rate_limit_file))
else:
d = {}
now = int(utils.get_current_datetime().timestamp())
if now - d.get(username, 0) < self.vhost_rate_limit_secs:
raise RateLimitError(f'Please wait {self.vhost_rate_limit_secs} seconds')
d[username] = now
json.dump(d, open(self.vhost_rate_limit_file, 'w'))
def create_vhost(self, username: str, domain: str, ip_address: str):
if self.vhost_mgr.get_num_vhosts(username) >= self.max_vhosts_per_account:
raise Exception(f'Only {self.max_vhosts_per_account} vhosts '
'allowed per account')
if not self._is_valid_domain(username, domain):
raise InvalidDomainError()
if not self._is_valid_ip_address(ip_address):
raise InvalidIPError()
self._check_rate_limit(username)
self.vhost_mgr.create_vhost(username, domain, ip_address)
def delete_vhost(self, username: str, domain: str):
if not self._is_valid_domain(username, domain):
raise InvalidDomainError()
self.vhost_mgr.delete_vhost(username, domain)
def get_vhosts(self, username: str) -> List[Dict]:
return self.vhost_mgr.get_vhosts(username)

@ -0,0 +1,100 @@
from base64 import b64encode
import hashlib
import hmac
from typing import Dict
from urllib.parse import quote
import requests
from zope import component
from zope.interface import implementer
from ceo_common.errors import CloudStackAPIError
from ceo_common.logger_factory import logger_factory
from ceo_common.interfaces import ICloudStackService, IConfig, IUser
logger = logger_factory(__name__)
@implementer(ICloudStackService)
class CloudStackService:
def __init__(self):
cfg = component.getUtility(IConfig)
self.api_key = cfg.get('cloudstack_api_key')
self.secret_key = cfg.get('cloudstack_secret_key')
self.base_url = cfg.get('cloudstack_base_url')
self.members_domain = 'Members'
self._cached_domain_id = None
def _create_url(self, params: Dict[str, str]) -> str:
# See https://docs.cloudstack.apache.org/en/latest/developersguide/dev.html#the-cloudstack-api
if 'apiKey' not in params and 'apikey' not in params:
params['apiKey'] = self.api_key
params['response'] = 'json'
request_str = '&'.join(
key + '=' + quote(val)
for key, val in params.items()
)
sig_str = '&'.join(
key.lower() + '=' + quote(val).lower()
for key, val in sorted(params.items())
)
sig = hmac.new(self.secret_key.encode(), sig_str.encode(), hashlib.sha1).digest()
encoded_sig = b64encode(sig).decode()
url = self.base_url + '?' + request_str + '&signature=' + quote(encoded_sig)
return url
def _get_domain_id(self) -> str:
if self._cached_domain_id is not None:
return self._cached_domain_id
url = self._create_url({
'command': 'listDomains',
'details': 'min',
'name': self.members_domain,
})
resp = requests.get(url)
resp.raise_for_status()
d = resp.json()['listdomainsresponse']
assert d['count'] == 1, 'there should be one domain found'
domain_id = d['domain'][0]['id']
self._cached_domain_id = domain_id
return domain_id
def get_accounts(self) -> Dict[str, str]:
domain_id = self._get_domain_id()
url = self._create_url({
'command': 'listAccounts',
'domainid': domain_id,
'details': 'min',
})
resp = requests.get(url)
resp.raise_for_status()
d = resp.json()['listaccountsresponse']
if 'account' not in d:
# The API returns an empty dict if there are no accounts
return []
return {
account['name']: account['id']
for account in d['account']
}
def delete_account(self, account_id: str):
url = self._create_url({
'command': 'deleteAccount',
'id': account_id,
})
resp = requests.post(url)
resp.raise_for_status()
def create_account(self, user: IUser):
domain_id = self._get_domain_id()
url = self._create_url({
'command': 'ldapCreateAccount',
'accounttype': '0',
'domainid': domain_id,
'username': user.uid,
})
resp = requests.post(url)
d = resp.json()['createaccountresponse']
if not resp.ok:
raise CloudStackAPIError(d['errortext'])

@ -0,0 +1,116 @@
import base64
import json
import os
import subprocess
import tempfile
import time
from typing import List
import jinja2
from zope import component
from zope.interface import implementer
from ceo_common.interfaces import IConfig, IKubernetesService
@implementer(IKubernetesService)
class KubernetesService:
namespace_prefix = 'csc-'
def __init__(self):
cfg = component.getUtility(IConfig)
self.members_clusterrole = cfg.get('k8s_members_clusterrole')
self.members_group = cfg.get('k8s_members_group')
self.authority_cert_path = cfg.get('k8s_authority_cert_path')
self.server_url = cfg.get('k8s_server_url')
self.jinja_env = jinja2.Environment(
loader=jinja2.PackageLoader('ceod.model'),
keep_trailing_newline=True,
)
def _run(self, args: List[str], check=True, **kwargs) -> subprocess.CompletedProcess:
return subprocess.run(args, check=check, text=True, **kwargs)
def _apply_manifest(self, manifest: str):
self._run(['kubectl', 'apply', '-f', '-'], input=manifest)
@classmethod
def _get_namespace(cls, username: str) -> str:
return cls.namespace_prefix + username
@classmethod
def _get_username_from_namespace(cls, namespace: str) -> str:
assert namespace.startswith(cls.namespace_prefix)
return namespace[len(cls.namespace_prefix):]
def create_account(self, username: str) -> str:
with tempfile.TemporaryDirectory() as tempdir:
# Create a new CSR
csr_path = os.path.join(tempdir, username + '.csr')
key_path = os.path.join(tempdir, username + '.key')
self._run([
'openssl', 'req', '-new', '-newkey', 'rsa:2048', '-nodes',
'-keyout', key_path, '-subj', f'/CN={username}/O={self.members_group}',
'-out', csr_path,
], stdin=subprocess.DEVNULL)
# Upload the CSR
encoded_csr = base64.b64encode(open(csr_path, 'rb').read()).decode()
csr_name = 'csc-' + username + '-csr'
template = self.jinja_env.get_template('kubernetes_csr.yaml.j2')
body = template.render(csr_name=csr_name, encoded_csr=encoded_csr)
self._apply_manifest(body)
# Approve the CSR
self._run(['kubectl', 'certificate', 'approve', csr_name])
# Wait until the certificate is issued
encoded_cert = ''
max_tries = 5
for i in range(max_tries):
proc = self._run([
'kubectl', 'get', 'csr', csr_name,
'-o', 'jsonpath={.status.certificate}',
], capture_output=True)
encoded_cert = proc.stdout
if encoded_cert != '':
break
time.sleep(1)
if encoded_cert == '':
raise Exception('Timed out waiting for certificate to get issued')
# Delete the CSR
self._run(['kubectl', 'delete', 'csr', csr_name])
# Create a namespace
namespace = self._get_namespace(username)
template = self.jinja_env.get_template('kubernetes_user.yaml.j2')
body = template.render(
username=username, namespace=namespace,
members_clusterrole=self.members_clusterrole)
self._apply_manifest(body)
# Return the kubeconfig
encoded_key = base64.b64encode(open(key_path, 'rb').read()).decode()
encoded_authority_cert = base64.b64encode(
open(self.authority_cert_path, 'rb').read()
).decode()
template = self.jinja_env.get_template('kubeconfig.j2')
body = template.render(
username=username, namespace=namespace,
server_url=self.server_url,
encoded_cert=encoded_cert, encoded_key=encoded_key,
encoded_authority_cert=encoded_authority_cert)
return body
def delete_account(self, username: str):
namespace = self._get_namespace(username)
# don't check exit code because namespace might not exist
self._run(['kubectl', 'delete', 'namespace', namespace], check=False)
def get_accounts(self) -> List[str]:
proc = self._run(['kubectl', 'get', 'namespaces', '-o', 'json'],
capture_output=True)
items = json.loads(proc.stdout)['items']
namespaces = [item['metadata']['name'] for item in items]
return [
self._get_username_from_namespace(namespace)
for namespace in namespaces
if namespace.startswith(self.namespace_prefix)
]

@ -35,6 +35,7 @@ class MailService:
self.base_domain = cfg.get('base_domain')
self.jinja_env = jinja2.Environment(
loader=jinja2.PackageLoader('ceod.model'),
keep_trailing_newline=True,
)
def send(self, _from: str, to: str, headers: Dict[str, str], content: str):

@ -1,4 +1,5 @@
import glob
import ipaddress
import os
import re
import shutil
@ -9,11 +10,17 @@ import jinja2
from zope import component
from zope.interface import implementer
from .utils import rate_limit
from ceo_common.errors import InvalidDomainError, InvalidIPError
from ceo_common.logger_factory import logger_factory
from ceo_common.interfaces import IVHostManager, IConfig
PROXY_PASS_IP_RE = re.compile(r'^\s+proxy_pass\s+http://(?P<ip_address>[\d.]+);$')
PROXY_PASS_IP_RE = re.compile(
r'^\s+proxy_pass\s+http://(?P<ip_address>[\w.:-]+);$'
)
VHOST_FILENAME_RE = re.compile(r'^(?P<username>[0-9a-z-]+)_(?P<domain>[0-9a-z.-]+)$')
VALID_DOMAIN_RE = re.compile(r'^(?:[0-9a-z-]+\.)+[a-z]+$')
IP_WITH_PORT_RE = re.compile(r'^(?P<ip_address>[\d.]+)(:\d{2,5})?$')
logger = logger_factory(__name__)
@ -31,6 +38,21 @@ class VHostManager:
self.default_ssl_cert = cfg.get('cloud vhosts_default_ssl_cert')
self.default_ssl_key = cfg.get('cloud vhosts_default_ssl_key')
self.vhost_domain = cfg.get('cloud vhosts_members_domain')
self.vhost_domain_re = re.compile(
r'^[a-z0-9-]+\.' + self.vhost_domain.replace('.', r'\.') + '$'
)
self.k8s_vhost_domain = cfg.get('cloud vhosts_k8s_members_domain')
self.k8s_vhost_domain_re = re.compile(
r'^[a-z0-9-]+\.' + self.k8s_vhost_domain.replace('.', r'\.') + '$'
)
self.k8s_ssl_cert = cfg.get('cloud vhosts_k8s_ssl_cert')
self.k8s_ssl_key = cfg.get('cloud vhosts_k8s_ssl_key')
self.max_vhosts_per_account = cfg.get('cloud vhosts_max_vhosts_per_account')
self.vhost_ip_min = ipaddress.ip_address(cfg.get('cloud vhosts_ip_range_min'))
self.vhost_ip_max = ipaddress.ip_address(cfg.get('cloud vhosts_ip_range_max'))
self.acme_challenge_dir = cfg.get('cloud vhosts_acme_challenge_dir')
self.acme_dir = '/root/.acme.sh'
@ -38,8 +60,13 @@ class VHostManager:
self.jinja_env = jinja2.Environment(
loader=jinja2.PackageLoader('ceod.model'),
keep_trailing_newline=True,
)
rate_limit_secs = cfg.get('cloud vhosts_rate_limit_seconds')
self.create_vhost = \
rate_limit('create_vhost', rate_limit_secs)(self.create_vhost)
@staticmethod
def _vhost_filename(username: str, domain: str) -> str:
"""Generate a filename for the vhost record"""
@ -62,7 +89,52 @@ class VHostManager:
logger.debug('Reloading NGINX')
self._run(['systemctl', 'reload', 'nginx'])
def is_valid_domain(self, username: str, domain: str) -> bool:
if VALID_DOMAIN_RE.match(domain) is None:
return False
if len(domain) > 80:
return False
if domain.endswith('.' + self.k8s_vhost_domain):
prefix = domain[:len(domain) - len(self.k8s_vhost_domain) - 1]
elif domain.endswith('.' + self.vhost_domain):
prefix = domain[:len(domain) - len(self.vhost_domain) - 1]
else:
return False
last_part = prefix.split('.')[-1]
if last_part == username:
return True
if last_part.endswith('-' + username):
return True
return False
def is_valid_ip_address(self, ip_address: str) -> bool:
if ip_address == 'k8s':
# special case - this is an NGINX upstream
return True
# strip off the port number, if there is one
match = IP_WITH_PORT_RE.match(ip_address)
if match is None:
return False
ip_address = match.group('ip_address')
# make sure the IP is in the allowed range
try:
addr = ipaddress.ip_address(ip_address)
except ValueError:
return False
return self.vhost_ip_min <= addr <= self.vhost_ip_max
def _get_cert_and_key_path(self, domain: str) -> Tuple[str, str]:
# Use the wildcard certs, if possible
if self.vhost_domain_re.match(domain) is not None:
return self.default_ssl_cert, self.default_ssl_key
elif self.k8s_vhost_domain_re.match(domain) is not None:
return self.k8s_ssl_cert, self.k8s_ssl_key
# Otherwise, obtain a new cert with acme.sh
cert_path = f'{self.ssl_dir}/{domain}.chain'
key_path = f'{self.ssl_dir}/{domain}.key'
return cert_path, key_path
@ -90,6 +162,14 @@ class VHostManager:
os.unlink(key_path)
def create_vhost(self, username: str, domain: str, ip_address: str):
if self.get_num_vhosts(username) >= self.max_vhosts_per_account:
raise Exception(f'Only {self.max_vhosts_per_account} vhosts '
'allowed per account')
if not self.is_valid_domain(username, domain):
raise InvalidDomainError()
if not self.is_valid_ip_address(ip_address):
raise InvalidIPError()
cert_path, key_path = self._get_cert_and_key_path(domain)
if not (os.path.exists(cert_path) and os.path.exists(key_path)):
self._acquire_new_cert(domain, cert_path, key_path)
@ -105,8 +185,13 @@ class VHostManager:
self._reload_web_server()
def delete_vhost(self, username: str, domain: str):
if not self.is_valid_domain(username, domain):
raise InvalidDomainError()
cert_path, key_path = self._get_cert_and_key_path(domain)
if os.path.exists(cert_path) and os.path.exists(key_path):
if cert_path not in [self.default_ssl_cert, self.k8s_ssl_cert] \
and cert_path.startswith(self.ssl_dir) \
and os.path.exists(cert_path) and os.path.exists(key_path):
self._delete_cert(domain, cert_path, key_path)
filepath = self._vhost_filepath(username, domain)
@ -143,3 +228,12 @@ class VHostManager:
logger.info(f'Deleting {filepath}')
os.unlink(filepath)
self._reload_web_server()
def get_accounts(self) -> List[str]:
vhost_files = os.listdir(self.vhost_dir)
usernames = list({
filename.split('_', 1)[0]
for filename in vhost_files
if '_' in filename
})
return usernames

@ -1,4 +1,5 @@
from .CloudService import CloudService
from .CloudResourceManager import CloudResourceManager
from .CloudStackService import CloudStackService
from .KerberosService import KerberosService
from .LDAPService import LDAPService, UserNotFoundError, GroupNotFoundError
from .User import User
@ -10,3 +11,4 @@ from .SudoRole import SudoRole
from .MailService import MailService
from .MailmanService import MailmanService
from .VHostManager import VHostManager
from .KubernetesService import KubernetesService

@ -0,0 +1,20 @@
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: {{ encoded_authority_cert }}
server: {{ server_url }}
name: kubernetes
contexts:
- context:
cluster: kubernetes
namespace: {{ namespace }}
user: {{ username }}
name: {{ username }}
current-context: {{ username }}
kind: Config
preferences: {}
users:
- name: {{ username }}
user:
client-certificate-data: {{ encoded_cert }}
client-key-data: {{ encoded_key }}

@ -0,0 +1,10 @@
apiVersion: certificates.k8s.io/v1
kind: CertificateSigningRequest
metadata:
name: {{ csr_name }}
spec:
request: {{ encoded_csr }}
signerName: kubernetes.io/kube-apiserver-client
expirationSeconds: 315360000 # ten years
usages:
- client auth

@ -0,0 +1,60 @@
apiVersion: v1
kind: Namespace
metadata:
name: {{ namespace }}
labels:
pod-security.kubernetes.io/enforce: baseline
---
apiVersion: v1
kind: ResourceQuota
metadata:
name: {{ username }}-resourcequota
namespace: {{ namespace }}
spec:
hard:
requests.storage: 25Gi
count/jobs.batch: 10
count/cronjobs.batch: 10
count/pods: 40
services.loadbalancers: 0
services.nodeports: 5
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ members_clusterrole }}
namespace: {{ namespace }}
subjects:
- kind: User
name: {{ username }}
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: ClusterRole
name: {{ members_clusterrole }}
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ username }}-ns-role
namespace: {{ namespace }}
rules:
# Allow members to view their own namespace
- apiGroups: [""]
resources: ["namespaces"]
resourceNames: ["{{ namespace }}"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ username }}-ns-rolebinding
namespace: {{ namespace }}
subjects:
- kind: User
name: {{ username }}
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: Role
name: {{ username }}-ns-role
apiGroup: rbac.authorization.k8s.io

@ -9,8 +9,8 @@ server {
}
server {
listen 443 ssl;
listen [::]:443 ssl;
listen 443 ssl http2;
listen [::]:443 ssl http2;
server_name {{ domain }};
ssl_certificate {{ ssl_cert_path }};
ssl_certificate_key {{ ssl_key_path }};
@ -18,6 +18,7 @@ server {
location / {
proxy_pass http://{{ ip_address }};
}
include proxy_params;
access_log /var/log/nginx/member-{{ username }}-access.log;
error_log /var/log/nginx/member-{{ username }}-error.log;

@ -1,6 +1,11 @@
from typing import Dict, List, Union
import functools
import json
import os
from typing import Dict, List, Union, Callable
from ceo_common.errors import RateLimitError
from ceo_common.model import Term
import ceo_common.utils
def bytes_to_strings(data: Dict[str, List[bytes]]) -> Dict[str, List[str]]: