pyceo/tests/ceod/api/test_cloud.py

304 lines
11 KiB
Python

import datetime
import os
import types
from unittest.mock import patch
import ldap3
import dns.resolver
from hashlib import md5
from ceo_common.model import Term
import ceo_common.utils as ceo_common_utils
def expire_member(user, ldap_conn):
most_recent_term = max(map(Term, user.terms))
new_term = most_recent_term - 1
changes = {
'term': [(ldap3.MODIFY_REPLACE, [str(new_term)])]
}
dn = user.ldap_srv.uid_to_dn(user.uid)
ldap_conn.modify(dn, changes)
def test_create_account(client, mock_cloud_server, new_user, ldap_conn):
uid = new_user.uid
mock_cloud_server.clear()
status, _ = client.post('/api/cloud/accounts/create', principal=uid)
assert status == 200
assert uid in mock_cloud_server.users_by_username
status, _ = client.post('/api/cloud/accounts/create', principal=uid)
assert status != 200
mock_cloud_server.clear()
expire_member(new_user, ldap_conn)
status, _ = client.post('/api/cloud/accounts/create', principal=uid)
assert status == 403
def test_purge_accounts(
client, mock_cloud_server, cloud_mgr, mock_mail_server,
mock_harbor_server, new_user, ldap_conn,
):
uid = new_user.uid
mock_cloud_server.clear()
mock_mail_server.messages.clear()
accounts_deleted = []
accounts_to_be_deleted = []
if os.path.isfile(cloud_mgr.pending_deletions_file):
os.unlink(cloud_mgr.pending_deletions_file)
expected = {
'accounts_deleted': accounts_deleted,
'accounts_to_be_deleted': accounts_to_be_deleted,
}
current_term = Term.current()
beginning_of_term = current_term.to_datetime()
client.post('/api/cloud/accounts/create', principal=uid)
expire_member(new_user, ldap_conn)
with patch.object(ceo_common_utils, 'get_current_datetime') as now_mock:
# one-month grace period - account should not be deleted
now_mock.return_value = beginning_of_term + datetime.timedelta(days=1)
status, data = client.post('/api/cloud/accounts/purge')
assert status == 200
assert data == expected
# grace period has passed - user should be sent a warning
now_mock.return_value += datetime.timedelta(days=32)
accounts_to_be_deleted.append(new_user.uid)
status, data = client.post('/api/cloud/accounts/purge')
assert status == 200
assert data == expected
assert os.path.isfile(cloud_mgr.pending_deletions_file)
assert len(mock_mail_server.messages) == 1
# user still has one week left to renew their membership
status, data = client.post('/api/cloud/accounts/purge')
assert status == 200
assert data == expected
# one week has passed - the account can now be deleted
now_mock.return_value += datetime.timedelta(days=8)
accounts_to_be_deleted.clear()
accounts_deleted.append(new_user.uid)
status, data = client.post('/api/cloud/accounts/purge')
assert status == 200
assert data == expected
assert new_user.uid not in mock_cloud_server.users_by_username
assert len(mock_mail_server.messages) == 2
mock_mail_server.messages.clear()
def test_cloud_vhosts(cfg, client, new_user, ldap_conn):
members_domain = cfg.get('cloud vhosts_members_domain')
max_vhosts = cfg.get('cloud vhosts_max_vhosts_per_account')
rate_limit_secs = cfg.get('cloud vhosts_rate_limit_seconds')
uid = new_user.uid
rate_limit_file = '/run/ceod/rate_limit.json'
if os.path.exists(rate_limit_file):
os.unlink(rate_limit_file)
domain1 = uid + '.' + members_domain
ip1 = '172.19.134.11'
status, _ = client.put(
f'/api/cloud/vhosts/{domain1}', json={'ip_address': ip1},
principal=uid)
assert status == 200
status, data = client.get('/api/cloud/vhosts', principal=uid)
assert status == 200
assert data == {'vhosts': [{'domain': domain1, 'ip_address': ip1}]}
# rate limit
status, _ = client.put(
f'/api/cloud/vhosts/{domain1}', json={'ip_address': ip1},
principal=uid)
assert status == 403
now = ceo_common_utils.get_current_datetime()
with patch.object(ceo_common_utils, 'get_current_datetime') as now_mock:
now_mock.return_value = now + datetime.timedelta(seconds=rate_limit_secs)
# invalid domain name
domain2 = uid + 'cloud.' + cfg.get('base_domain')
ip2 = ip1
status, _ = client.put(
f'/api/cloud/vhosts/{domain2}', json={'ip_address': ip2},
principal=uid)
assert status == 400
# invalid IP address
domain3 = domain1
ip3 = '129.97.134.10'
status, _ = client.put(
f'/api/cloud/vhosts/{domain3}', json={'ip_address': ip3},
principal=uid)
assert status == 400
# new vhost with same domain should replace old one
domain4 = domain1
ip4 = '172.19.134.14'
status, _ = client.put(
f'/api/cloud/vhosts/{domain4}', json={'ip_address': ip4},
principal=uid)
assert status == 200
status, data = client.get('/api/cloud/vhosts', principal=uid)
assert status == 200
assert data == {'vhosts': [{'domain': domain4, 'ip_address': ip4}]}
# maximum number of vhosts
for i in range(max_vhosts):
now_mock.return_value += datetime.timedelta(seconds=rate_limit_secs)
domain = 'app' + str(i + 1) + '.' + uid + '.' + members_domain
status, _ = client.put(
f'/api/cloud/vhosts/{domain}', json={'ip_address': ip1},
principal=uid)
if i < max_vhosts - 1:
assert status == 200
else:
assert status != 200
# delete all vhosts
for i in range(max_vhosts - 1):
domain = 'app' + str(i + 1) + '.' + uid + '.' + members_domain
status, _ = client.delete(f'/api/cloud/vhosts/{domain}', principal=uid)
assert status == 200
# invalid custom domain name (with TXT record)
custom_domain = "uwaterloo.ca"
ip2 = ip1
status, _ = client.put(
f'/api/cloud/vhosts/{custom_domain}', json={'ip_address': ip2},
principal=uid)
assert status == 400
# invalid custom domain name (without TXT record)
custom_domain2 = "google.com"
ip2 = ip1
status, _ = client.put(
f'/api/cloud/vhosts/{custom_domain2}', json={'ip_address': ip2},
principal=uid)
assert status == 400
# custom domain setup
dns_return_mock = types.SimpleNamespace()
dns_return_mock.rrset = [types.SimpleNamespace()]
dns_return_mock.rrset[
0].to_text = lambda: f"csc-verification={md5(uid.encode('utf-8')).hexdigest()}"
# valid custom domain name
now_mock.return_value += datetime.timedelta(seconds=rate_limit_secs)
custom_domain3 = "example.com"
ip2 = '172.19.134.32'
with patch.object(dns.resolver, 'resolve') as dns_mock:
dns_mock.return_value = dns_return_mock
status, _ = client.put(
f'/api/cloud/vhosts/{custom_domain3}', json={'ip_address': ip2},
principal=uid)
print(status)
assert status == 200
# new custom domain name replaces old custom domain name
now_mock.return_value += datetime.timedelta(seconds=rate_limit_secs)
custom_domain4 = custom_domain3
ip2 = '172.19.134.32'
ip3 = '172.19.134.33'
with patch.object(dns.resolver, 'resolve') as dns_mock:
dns_mock.return_value = dns_return_mock
status, _ = client.put(
f'/api/cloud/vhosts/{custom_domain4}', json={'ip_address': ip3},
principal=uid)
print(status)
status, data = client.get('/api/cloud/vhosts', principal=uid)
assert status == 200
assert {'domain': custom_domain4, 'ip_address': ip3} in data['vhosts']
assert {'domain': custom_domain4, 'ip_address': ip2} not in data['vhosts']
# ip address can point to both csc subdomain and also custom domain
now_mock.return_value += datetime.timedelta(seconds=rate_limit_secs)
custom_domain3 = "example.com"
ip3 = '172.19.134.33'
csc_domain = uid + '.' + members_domain
status, _ = client.put(
f'/api/cloud/vhosts/{domain1}', json={'ip_address': ip3},
principal=uid)
assert status == 200
status, data = client.get('/api/cloud/vhosts', principal=uid)
assert status == 200
assert {'domain': custom_domain4, 'ip_address': ip3} in data['vhosts']
assert {'domain': csc_domain, 'ip_address': ip3} in data['vhosts']
# csc-verification record for a different user fails
now_mock.return_value += datetime.timedelta(seconds=rate_limit_secs)
custom_domain3 = "example.com"
ip2 = '172.19.134.34'
with patch.object(dns.resolver, 'resolve') as dns_mock:
dns_mock.return_value = dns_return_mock
dns_mock.return_value.rrset[0].to_text = lambda: f"csc-verification={md5('dummy'.encode('utf-8')).hexdigest()}"
status, _ = client.put(
f'/api/cloud/vhosts/{custom_domain3}', json={'ip_address': ip2},
principal=uid)
print(status)
assert status == 400
# expired members may not create vhosts
expire_member(new_user, ldap_conn)
status, _ = client.put(
f'/api/cloud/vhosts/{domain1}', json={'ip_address': ip1},
principal=uid)
assert status == 403
def test_cloud_resources_purged_account(
cfg, client, mock_cloud_server, mock_mail_server, mock_harbor_server,
new_user, ldap_conn,
):
uid = new_user.uid
members_domain = cfg.get('cloud vhosts_members_domain')
mock_cloud_server.clear()
mock_harbor_server.reset()
current_term = Term.current()
beginning_of_term = current_term.to_datetime()
domain1 = uid + '.' + members_domain
ip1 = '172.19.134.11'
client.post('/api/cloud/accounts/create', principal=uid)
client.put(
f'/api/cloud/vhosts/{domain1}', json={'ip_address': ip1},
principal=uid)
mock_harbor_server.users.append(uid)
client.post('/api/cloud/registry/projects', principal=uid)
assert len(mock_harbor_server.projects) == 1
expire_member(new_user, ldap_conn)
with patch.object(ceo_common_utils, 'get_current_datetime') as now_mock:
# grace period has passed - user should be sent a warning
now_mock.return_value = beginning_of_term + datetime.timedelta(days=31)
client.post('/api/cloud/accounts/purge')
# one week has passed - the account can now be deleted
now_mock.return_value += datetime.timedelta(days=8)
client.post('/api/cloud/accounts/purge')
# vhosts should have been deleted
status, data = client.get('/api/cloud/vhosts', principal=uid)
assert status == 200
assert data == {'vhosts': []}
# registry project should have been deleted
assert len(mock_harbor_server.projects) == 0
mock_mail_server.messages.clear()
def test_k8s_create_account(client, new_user, ldap_conn):
uid = new_user.uid
status, data = client.post('/api/cloud/k8s/accounts/create', principal=uid)
assert status == 200
assert data['status'] == 'OK'
assert 'kubeconfig' in data
expire_member(new_user, ldap_conn)
status, _ = client.post('/api/cloud/k8s/accounts/create', principal=uid)
assert status == 403