add unit tests for cloud API

This commit is contained in:
Max Erenberg 2021-11-20 22:11:38 -05:00
parent fa9c2b33d5
commit bb7818e77d
7 changed files with 160 additions and 12 deletions

View File

@ -1,5 +1,7 @@
import datetime import datetime
import ceo_common.utils as utils
class Term: class Term:
"""A representation of a term in the CSC LDAP, e.g. 's2021'.""" """A representation of a term in the CSC LDAP, e.g. 's2021'."""
@ -17,7 +19,7 @@ class Term:
@staticmethod @staticmethod
def current(): def current():
"""Get a Term object for the current date.""" """Get a Term object for the current date."""
dt = datetime.datetime.now() dt = utils.get_current_datetime()
c = 'w' c = 'w'
if 5 <= dt.month <= 8: if 5 <= dt.month <= 8:
c = 's' c = 's'
@ -27,18 +29,19 @@ class Term:
return Term(s_term) return Term(s_term)
def __add__(self, other): def __add__(self, other):
assert type(other) is int and other >= 0 assert type(other) is int
c = self.s_term[0] c = self.s_term[0]
season_idx = self.seasons.index(c) season_idx = self.seasons.index(c)
year = int(self.s_term[1:]) year = int(self.s_term[1:])
year += other // 3 season_idx += other
season_idx += other % 3 year += season_idx // 3
if season_idx >= 3: season_idx %= 3
year += 1
season_idx -= 3
s_term = self.seasons[season_idx] + str(year) s_term = self.seasons[season_idx] + str(year)
return Term(s_term) return Term(s_term)
def __sub__(self, other):
return self.__add__(-other)
def __eq__(self, other): def __eq__(self, other):
return isinstance(other, Term) and self.s_term == other.s_term return isinstance(other, Term) and self.s_term == other.s_term

7
ceo_common/utils.py Normal file
View File

@ -0,0 +1,7 @@
import datetime
def get_current_datetime() -> datetime.datetime:
# We place this in a separate function so that we can mock it out
# in our unit tests.
return datetime.datetime.now()

View File

@ -16,6 +16,7 @@ from ceo_common.logger_factory import logger_factory
from ceo_common.interfaces import ICloudService, IConfig, IUser, ILDAPService, \ from ceo_common.interfaces import ICloudService, IConfig, IUser, ILDAPService, \
IMailService IMailService
from ceo_common.model import Term from ceo_common.model import Term
import ceo_common.utils as utils
logger = logger_factory(__name__) logger = logger_factory(__name__)
@ -112,7 +113,7 @@ class CloudService:
current_term = Term.current() current_term = Term.current()
beginning_of_term = current_term.to_datetime() beginning_of_term = current_term.to_datetime()
now = datetime.datetime.now() now = utils.get_current_datetime()
delta = now - beginning_of_term delta = now - beginning_of_term
if delta.days < 30: if delta.days < 30:
# one-month grace period # one-month grace period
@ -132,6 +133,7 @@ class CloudService:
'Skipping account purge because less than one week has ' 'Skipping account purge because less than one week has '
'passed since the warning emails were sent out' 'passed since the warning emails were sent out'
) )
accounts_to_be_deleted.extend(state['accounts_to_be_deleted'])
return result return result
username_to_account_id = { username_to_account_id = {
account['name']: account['id'] account['name']: account['id']

View File

@ -27,7 +27,7 @@ class MockCloudStackServer(MockHTTPServerBase):
self.users_by_accountid.clear() self.users_by_accountid.clear()
self.users_by_username.clear() self.users_by_username.clear()
def reset_handler(self, request): async def reset_handler(self, request):
self.clear() self.clear()
return web.Response(text='OK\n') return web.Response(text='OK\n')

View File

@ -0,0 +1,86 @@
import datetime
import os
from unittest.mock import patch
import ldap3
from ceo_common.model import Term
import ceo_common.utils as ceo_common_utils
def expire_member(user, ldap_conn):
most_recent_term = max(map(Term, user.terms))
new_term = most_recent_term - 1
changes = {
'term': [(ldap3.MODIFY_REPLACE, [str(new_term)])]
}
dn = user.ldap_srv.uid_to_dn(user.uid)
ldap_conn.modify(dn, changes)
def test_create_account(client, mock_cloud_server, new_user, ldap_conn):
uid = new_user.uid
mock_cloud_server.clear()
status, _ = client.post('/api/cloud/accounts/create', principal=uid)
assert status == 200
assert uid in mock_cloud_server.users_by_username
status, _ = client.post('/api/cloud/accounts/create', principal=uid)
assert status != 200
mock_cloud_server.clear()
expire_member(new_user, ldap_conn)
status, _ = client.post('/api/cloud/accounts/create', principal=uid)
assert status == 403
def test_purge_accounts(
client, mock_cloud_server, cloud_srv, mock_mail_server, new_user,
ldap_conn,
):
uid = new_user.uid
mock_cloud_server.clear()
mock_mail_server.messages.clear()
accounts_deleted = []
accounts_to_be_deleted = []
if os.path.isfile(cloud_srv.pending_deletions_file):
os.unlink(cloud_srv.pending_deletions_file)
expected = {
'accounts_deleted': accounts_deleted,
'accounts_to_be_deleted': accounts_to_be_deleted,
}
current_term = Term.current()
beginning_of_term = current_term.to_datetime()
client.post('/api/cloud/accounts/create', principal=uid)
expire_member(new_user, ldap_conn)
with patch.object(ceo_common_utils, 'get_current_datetime') as now_mock:
# one-month grace period - account should not be deleted
now_mock.return_value = beginning_of_term + datetime.timedelta(days=1)
status, data = client.post('/api/cloud/accounts/purge')
assert status == 200
assert data == expected
# grace period has passed - user should be sent a warning
now_mock.return_value += datetime.timedelta(days=32)
accounts_to_be_deleted.append(new_user.uid)
status, data = client.post('/api/cloud/accounts/purge')
assert status == 200
assert data == expected
assert os.path.isfile(cloud_srv.pending_deletions_file)
assert len(mock_mail_server.messages) == 1
# user still has one week left to renew their membership
status, data = client.post('/api/cloud/accounts/purge')
assert status == 200
assert data == expected
# one week has passed - the account can now be deleted
now_mock.return_value += datetime.timedelta(days=8)
accounts_to_be_deleted.clear()
accounts_deleted.append(new_user.uid)
status, data = client.post('/api/cloud/accounts/purge')
assert status == 200
assert data == expected
assert new_user.uid not in mock_cloud_server.users_by_username
assert len(mock_mail_server.messages) == 2
mock_mail_server.messages.clear()

View File

@ -8,6 +8,7 @@ admin_host = phosphoric-acid
fs_root_host = phosphoric-acid fs_root_host = phosphoric-acid
mailman_host = phosphoric-acid mailman_host = phosphoric-acid
database_host = phosphoric-acid database_host = phosphoric-acid
cloud_host = phosphoric-acid
use_https = false use_https = false
port = 9988 port = 9988
@ -66,3 +67,9 @@ host = coffee
username = postgres username = postgres
password = postgres password = postgres
host = coffee host = coffee
[cloudstack]
api_key = REPLACE_ME
secret_key = REPLACE_ME
base_url = http://localhost:8080/client/api
members_domain = Members

View File

@ -22,15 +22,17 @@ from zope import component
from .utils import gssapi_token_ctx, ccache_cleanup # noqa: F401 from .utils import gssapi_token_ctx, ccache_cleanup # noqa: F401
from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \ from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \
IFileService, IMailmanService, IHTTPClient, IUWLDAPService, IMailService, \ IFileService, IMailmanService, IHTTPClient, IUWLDAPService, IMailService, \
IDatabaseService IDatabaseService, ICloudService
from ceo_common.model import Config, HTTPClient from ceo_common.model import Config, HTTPClient, Term
from ceod.api import create_app from ceod.api import create_app
from ceod.db import MySQLService, PostgreSQLService from ceod.db import MySQLService, PostgreSQLService
from ceod.model import KerberosService, LDAPService, FileService, User, \ from ceod.model import KerberosService, LDAPService, FileService, User, \
MailmanService, Group, UWLDAPService, UWLDAPRecord, MailService MailmanService, Group, UWLDAPService, UWLDAPRecord, MailService, \
CloudService
import ceod.utils as utils import ceod.utils as utils
from .MockSMTPServer import MockSMTPServer from .MockSMTPServer import MockSMTPServer
from .MockMailmanServer import MockMailmanServer from .MockMailmanServer import MockMailmanServer
from .MockCloudStackServer import MockCloudStackServer
from .conftest_ceod_api import client # noqa: F401 from .conftest_ceod_api import client # noqa: F401
from .conftest_ceo import cli_setup # noqa: F401 from .conftest_ceo import cli_setup # noqa: F401
@ -243,6 +245,14 @@ def mail_srv(cfg, mock_mail_server):
return _mail_srv return _mail_srv
@pytest.fixture(scope='session')
def mock_cloud_server():
mock_server = MockCloudStackServer()
mock_server.start()
yield mock_server
mock_server.stop()
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def mysql_srv(cfg): def mysql_srv(cfg):
mysql_srv = MySQLService() mysql_srv = MySQLService()
@ -257,6 +267,13 @@ def postgresql_srv(cfg):
return psql_srv return psql_srv
@pytest.fixture(scope='session')
def cloud_srv(cfg):
_cloud_srv = CloudService()
component.getGlobalSiteManager().registerUtility(_cloud_srv, ICloudService)
return _cloud_srv
@pytest.fixture(autouse=True, scope='session') @pytest.fixture(autouse=True, scope='session')
def app( def app(
cfg, cfg,
@ -268,6 +285,7 @@ def app(
mail_srv, mail_srv,
mysql_srv, mysql_srv,
postgresql_srv, postgresql_srv,
cloud_srv,
): ):
app = create_app({'TESTING': True}) app = create_app({'TESTING': True})
return app return app
@ -328,6 +346,31 @@ def krb_user(simple_user):
simple_user.remove_from_kerberos() simple_user.remove_from_kerberos()
@pytest.fixture
def new_user(client, g_admin_ctx, ldap_srv_session): # noqa: F811
uid = 'test_10001'
status, data = client.post('/api/members', json={
'uid': uid,
'cn': 'John Doe',
'given_name': 'John',
'sn': 'Doe',
'program': 'Math',
'terms': [str(Term.current())],
})
assert status == 200
assert data[-1]['status'] == 'completed'
with g_admin_ctx():
user = ldap_srv_session.get_user(uid)
subprocess.run([
'kadmin', '-k', '-p', 'ceod/admin', 'cpw',
'-pw', 'krb5', uid,
], check=True)
yield user
status, data = client.delete(f'/api/members/{uid}')
assert status == 200
assert data[-1]['status'] == 'completed'
@pytest.fixture @pytest.fixture
def simple_group(): def simple_group():
return Group( return Group(