import contextlib import importlib.resources import multiprocessing from multiprocessing import Process import os import shutil import subprocess from subprocess import DEVNULL import sys import tempfile import threading import time from unittest.mock import Mock import flask import gssapi import ldap3 import pytest import requests import socket from zope import component # noqa: F401 from .utils import ( # noqa: F401 gssapi_token_ctx, ccache_cleanup, mocks_for_create_user_ctx, reset_disable_club_conf, ) from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \ IFileService, IMailmanService, IHTTPClient, IUWLDAPService, IMailService, \ IDatabaseService, ICloudStackService, IKubernetesService, IVHostManager, \ ICloudResourceManager, IContainerRegistryService, IClubWebHostingService from ceo_common.model import Config, HTTPClient, Term import ceo_common.utils from ceod.api import create_app from ceod.db import MySQLService, PostgreSQLService from ceod.model import KerberosService, LDAPService, FileService, User, \ MailmanService, Group, UWLDAPService, UWLDAPRecord, MailService, \ CloudStackService, KubernetesService, VHostManager, CloudResourceManager, \ ContainerRegistryService, ClubWebHostingService from .MockSMTPServer import MockSMTPServer from .MockMailmanServer import MockMailmanServer from .MockCloudStackServer import MockCloudStackServer from .MockHarborServer import MockHarborServer from .conftest_ceod_api import client # noqa: F401 from .conftest_ceo import cli_setup # noqa: F401 @pytest.fixture(scope='session', autouse=True) def _drone_hostname_mock(): # Drone doesn't appear to set the hostname of the container. # Mock it instead. if 'DRONE_STEP_NAME' in os.environ: hostname = os.environ['DRONE_STEP_NAME'] fqdn = hostname + '.csclub.internal' socket.gethostname = Mock(return_value=hostname) socket.getfqdn = Mock(return_value=fqdn) @pytest.fixture(scope='session') def cfg(_drone_hostname_mock): with importlib.resources.path('tests', 'ceod_test_local.ini') as p: config_file = p.__fspath__() _cfg = Config(config_file) component.getGlobalSiteManager().registerUtility(_cfg, IConfig) return _cfg def delete_test_princs(krb_srv): proc = subprocess.run([ 'kadmin', '-k', '-p', krb_srv.admin_principal, 'listprincs', 'test*', ], text=True, capture_output=True, check=True) princs = [line.strip() for line in proc.stdout.splitlines()] for princ in princs: krb_srv.delprinc(princ) @pytest.fixture(scope='session') def krb_srv(cfg): # TODO: create temporary Kerberos database using kdb5_util. # We need to be root to read the keytab assert os.geteuid() == 0 # Delete the admin creds file, if it exists ccache_file = cfg.get('ldap_admin_principal_ccache') if os.path.isfile(ccache_file): os.unlink(ccache_file) krb = KerberosService() component.getGlobalSiteManager().registerUtility(krb, IKerberosService) delete_test_princs(krb) yield krb delete_test_princs(krb) def delete_subtree(conn: ldap3.Connection, base_dn: str): try: conn.search(base_dn, '(objectClass=*)', search_scope=ldap3.LEVEL) for entry in conn.entries: conn.delete(entry.entry_dn) conn.delete(base_dn) except ldap3.core.exceptions.LDAPNoSuchObjectResult: pass @pytest.fixture def g_admin_ctx(app): """ Store the credentials for ceod/admin in flask.g, and override KBR5CCNAME. This context manager should be used any time LDAP is modified via the LDAPService, and we are not in a request context. This should NOT be active when CeodTestClient is making a request, since that will override the values in flask.g. """ @contextlib.contextmanager def wrapper(): with gssapi_token_ctx('ceod/admin') as token, app.app_context(): try: flask.g.auth_user = 'ceod/admin' flask.g.client_token = token yield finally: flask.g.pop('client_token') flask.g.pop('auth_user') return wrapper @pytest.fixture def g_syscom(app): """ Store the credentials for ctdalek in flask.g and override KRB5CCNAME. Use this fixture if you need syscom credentials for an HTTP request to a different process. """ with gssapi_token_ctx('ctdalek') as token, app.app_context(): try: flask.g.auth_user = 'ctdalek' flask.g.client_token = token yield finally: flask.g.pop('client_token') flask.g.pop('auth_user') @pytest.fixture(scope='session') def ldap_conn(cfg) -> ldap3.Connection: # Assume that the same server URL is being used for the CSC # and UWLDAP during the tests. cfg = component.getUtility(IConfig) server_url = cfg.get('ldap_server_url') # sanity check assert server_url == cfg.get('uwldap_server_url') with gssapi_token_ctx('ceod/admin') as token: creds = gssapi.Credentials(token=token) conn = ldap3.Connection( server_url, auto_bind=True, raise_exceptions=True, authentication=ldap3.SASL, sasl_mechanism=ldap3.KERBEROS, sasl_credentials=(None, None, creds)) return conn @pytest.fixture def office_user(cfg, ldap_conn): users_base = cfg.get('ldap_users_base') home_parent = cfg.get('members_home') ldap_conn.add( f'uid=office1,{users_base}', ['top', 'account', 'posixAccount', 'shadowAccount', 'member'], { 'uid': 'office1', 'uidNumber': '20001', 'gidNumber': '20001', 'homeDirectory': f'{home_parent}/office1', 'loginShell': '/bin/bash', 'cn': 'Office One', 'givenName': 'Office', 'sn': 'One', 'program': 'Math', 'term': [str(Term.current())], } ) yield ldap_conn.delete(f'uid=office1,{users_base}') def add_stock_groups_and_users(cfg: IConfig, conn: ldap3.Connection): users_base = cfg.get('ldap_users_base') groups_base = cfg.get('ldap_groups_base') home_parent = cfg.get('members_home') conn.add( f'cn=syscom,{groups_base}', ['top', 'group', 'posixGroup'], { 'cn': 'syscom', 'gidNumber': '10001', 'uniqueMember': [f'uid=ctdalek,{users_base}'] } ) conn.add( f'uid=ctdalek,{users_base}', ['top', 'account', 'posixAccount', 'shadowAccount', 'member'], { 'uid': 'ctdalek', 'uidNumber': '20000', 'gidNumber': '20000', 'homeDirectory': f'{home_parent}/ctdalek', 'loginShell': '/bin/bash', 'cn': 'Calum T. Dalek', 'givenName': 'Calum', 'sn': 'Dalek', 'program': 'Math', 'term': [str(Term.current())], } ) conn.add( f'cn=office,{groups_base}', ['top', 'group', 'posixGroup'], { 'cn': 'office', 'gidNumber': '10003', 'uniqueMember': [f'uid=office1,{users_base}'] } ) @pytest.fixture(scope='session') def ldap_srv_session(cfg, krb_srv, ldap_conn): conn = ldap_conn users_base = cfg.get('ldap_users_base') groups_base = cfg.get('ldap_groups_base') sudo_base = cfg.get('ldap_sudo_base') for base_dn in [users_base, groups_base, sudo_base]: delete_subtree(conn, base_dn) conn.add(base_dn, 'organizationalUnit') # Needed for API authentication add_stock_groups_and_users(cfg, conn) _ldap_srv = LDAPService() component.getGlobalSiteManager().registerUtility(_ldap_srv, ILDAPService) yield _ldap_srv for base_dn in [users_base, groups_base, sudo_base]: delete_subtree(conn, base_dn) @pytest.fixture def ldap_srv(ldap_srv_session, g_admin_ctx): # This is an ugly hack to get around the fact that function-scoped # fixtures (g_admin_ctx) can't be used from session-scoped fixtures # (ldap_srv_session). with g_admin_ctx(): yield ldap_srv_session @pytest.fixture(scope='session') def file_srv(cfg): _file_srv = FileService() component.getGlobalSiteManager().registerUtility(_file_srv, IFileService) members_home = cfg.get('members_home') clubs_home = cfg.get('clubs_home') shutil.rmtree(members_home, ignore_errors=True) shutil.rmtree(clubs_home, ignore_errors=True) yield _file_srv shutil.rmtree(members_home, ignore_errors=True) shutil.rmtree(clubs_home, ignore_errors=True) @pytest.fixture(scope='session') def http_client(cfg): _client = HTTPClient() component.getGlobalSiteManager().registerUtility(_client, IHTTPClient) return _client @pytest.fixture(scope='session') def mock_mailman_server(): server = MockMailmanServer() server.start() yield server server.stop() @pytest.fixture(scope='session') def mailman_srv(mock_mailman_server, cfg, http_client): mailman = MailmanService() component.getGlobalSiteManager().registerUtility(mailman, IMailmanService) return mailman @pytest.fixture(scope='session') def uwldap_srv(cfg, ldap_conn): conn = ldap_conn base_dn = cfg.get('uwldap_base') delete_subtree(conn, base_dn) conn.add(base_dn, 'organizationalUnit') _uwldap_srv = UWLDAPService() component.getGlobalSiteManager().registerUtility(_uwldap_srv, IUWLDAPService) yield _uwldap_srv delete_subtree(conn, base_dn) @pytest.fixture(scope='session') def webhosting_srv(): with tempfile.TemporaryDirectory() as tmpdir: srv = ClubWebHostingService(tmpdir) component.getGlobalSiteManager().registerUtility(srv, IClubWebHostingService) yield srv @pytest.fixture(scope='session') def mock_mail_server(): mock_server = MockSMTPServer() mock_server.start() yield mock_server mock_server.stop() @pytest.fixture(scope='session') def mail_srv(cfg, mock_mail_server): _mail_srv = MailService() component.getGlobalSiteManager().registerUtility(_mail_srv, IMailService) return _mail_srv @pytest.fixture(scope='session') def mock_cloud_server(): mock_server = MockCloudStackServer() mock_server.start() yield mock_server mock_server.stop() @pytest.fixture(scope='session') def mock_harbor_server(): mock_server = MockHarborServer() mock_server.start() yield mock_server mock_server.stop() @pytest.fixture(scope='session') def registry_srv(cfg): reg_srv = ContainerRegistryService() component.getGlobalSiteManager().registerUtility(reg_srv, IContainerRegistryService) return reg_srv @pytest.fixture(scope='session') def mysql_srv(cfg): mysql_srv = MySQLService() component.getGlobalSiteManager().registerUtility(mysql_srv, IDatabaseService, 'mysql') return mysql_srv @pytest.fixture(scope='session') def postgresql_srv(cfg): psql_srv = PostgreSQLService() component.getGlobalSiteManager().registerUtility(psql_srv, IDatabaseService, 'postgresql') return psql_srv @pytest.fixture(scope='session') def vhost_dir_setup(cfg): state_dir = '/run/ceod' if os.path.isdir(state_dir): shutil.rmtree(state_dir) os.makedirs(state_dir) yield shutil.rmtree(state_dir) @pytest.fixture(scope='session') def vhost_mgr(cfg, vhost_dir_setup): mgr = VHostManager() component.getGlobalSiteManager().registerUtility(mgr, IVHostManager) return mgr @pytest.fixture(scope='session') def cloudstack_srv(cfg): srv = CloudStackService() component.getGlobalSiteManager().registerUtility(srv, ICloudStackService) return srv @pytest.fixture(scope='session') def k8s_srv(cfg, vhost_dir_setup): srv = KubernetesService() component.getGlobalSiteManager().registerUtility(srv, IKubernetesService) return srv @pytest.fixture(scope='session') def cloud_mgr(cfg): mgr = CloudResourceManager() component.getGlobalSiteManager().registerUtility(mgr, ICloudResourceManager) return mgr @pytest.fixture(autouse=True, scope='session') def app( cfg, krb_srv, ldap_srv_session, file_srv, mailman_srv, uwldap_srv, mail_srv, mysql_srv, postgresql_srv, cloudstack_srv, vhost_mgr, k8s_srv, registry_srv, cloud_mgr, webhosting_srv, ): app = create_app({'TESTING': True}) return app @pytest.fixture def mocks_for_create_user(): with mocks_for_create_user_ctx(): yield @pytest.fixture(scope='module') def mocks_for_create_user_module(): with mocks_for_create_user_ctx(): yield @pytest.fixture def simple_user(): return User( uid='test_jdoe', cn='John Doe', given_name='John', sn='Doe', program='Math', terms=[str(Term.current())], ) @pytest.fixture def simple_club(): return User( uid='test_club1', cn='Club One', is_club=True, ) @pytest.fixture def ldap_user(simple_user, g_admin_ctx): with g_admin_ctx(): simple_user.add_to_ldap() yield simple_user with g_admin_ctx(): simple_user.remove_from_ldap() @pytest.fixture def krb_user(simple_user): # We don't want to use add_to_kerberos() here because that expires the # user's password, which we don't want for testing subprocess.run( ['kadmin', '-k', '-p', 'ceod/admin', 'addprinc', '-pw', 'krb5', simple_user.uid], stdout=DEVNULL, check=True) yield simple_user simple_user.remove_from_kerberos() @pytest.fixture def new_user_gen( client, g_admin_ctx, ldap_srv_session, mocks_for_create_user, # noqa: F811 ): _new_user_id_counter = 11001 @contextlib.contextmanager def wrapper(): nonlocal _new_user_id_counter uid = 'test' + str(_new_user_id_counter) _new_user_id_counter += 1 status, data = client.post('/api/members', json={ 'uid': uid, 'cn': 'John Doe', 'given_name': 'John', 'sn': 'Doe', 'program': 'Math', 'terms': [str(Term.current())], }) assert status == 200 assert data[-1]['status'] == 'completed' subprocess.run([ 'kadmin', '-k', '-p', 'ceod/admin', 'modprinc', '-needchange', uid, ], check=True) with g_admin_ctx(): user = ldap_srv_session.get_user(uid) yield user status, data = client.delete(f'/api/members/{uid}') assert status == 200 assert data[-1]['status'] == 'completed' return wrapper @pytest.fixture def new_user(new_user_gen): with new_user_gen() as user: yield user @pytest.fixture def new_club_gen(client, g_admin_ctx, ldap_srv_session): # noqa: F811 new_club_id_counter = 31001 @contextlib.contextmanager def wrapper(): nonlocal new_club_id_counter cn = 'test' + str(new_club_id_counter) new_club_id_counter += 1 status, data = client.post('/api/groups', json={ 'cn': cn, 'description': 'Test ' + str(new_club_id_counter), }) assert status == 200 assert data[-1]['status'] == 'completed' with g_admin_ctx(): group = ldap_srv_session.get_group(cn) yield group status, data = client.delete(f'/api/groups/{cn}') assert status == 200 assert data[-1]['status'] == 'completed' return wrapper @pytest.fixture def simple_group(): return Group( cn='group1', gid_number=21000, user_cn='Group One', ) @pytest.fixture def ldap_group(simple_group, g_admin_ctx): with g_admin_ctx(): simple_group.add_to_ldap() yield simple_group with g_admin_ctx(): simple_group.remove_from_ldap() @pytest.fixture def uwldap_user(cfg, uwldap_srv, ldap_conn): conn = ldap_conn base_dn = cfg.get('uwldap_base') user = UWLDAPRecord( uid='test_jdoe', mail_local_addresses=['test_jdoe@uwaterloo.internal'], program='Math', cn='John Doe', sn='Doe', given_name='John', ) dn = f'uid={user.uid},{base_dn}' conn.add( dn, [ 'inetLocalMailRecipient', 'inetOrgPerson', 'organizationalPerson', 'person', ], { 'mailLocalAddress': user.mail_local_addresses, 'ou': user.program, 'cn': user.cn, 'sn': user.sn, 'givenName': user.given_name, }, ) yield user conn.delete(dn) @pytest.fixture def webhosting_srv_resources(webhosting_srv): os.makedirs(webhosting_srv.conf_available_dir, exist_ok=True) os.makedirs(webhosting_srv.sites_available_dir, exist_ok=True) reset_disable_club_conf(webhosting_srv) for file in os.listdir(webhosting_srv.sites_available_dir): os.unlink(os.path.join(webhosting_srv.sites_available_dir, file)) if os.path.isdir(webhosting_srv.apache_dir + '/.git'): shutil.rmtree(webhosting_srv.apache_dir + '/.git') @pytest.fixture(scope='module') def app_process(cfg, app, http_client): port = cfg.get('ceod_port') hostname = socket.gethostname() # The parent process may want to mock the datetime # function in the child process, so we need IPC mock_datetime_value = None orig_get_datetime = ceo_common.utils.get_current_datetime def mock_get_datetime(): if mock_datetime_value is not None: return mock_datetime_value else: return orig_get_datetime() def ipc_thread_start(pipe): nonlocal mock_datetime_value ceo_common.utils.get_current_datetime = mock_get_datetime while True: mock_datetime_value = pipe.recv() pipe.send(None) # ACK def server_start(pipe): sys.stdout = open('/dev/null', 'w') sys.stderr = sys.stdout threading.Thread(target=ipc_thread_start, args=(pipe,)).start() app.run(debug=False, host='0.0.0.0', port=port) parent_conn, child_conn = multiprocessing.Pipe() proc = Process(target=server_start, args=(child_conn,)) proc.start() try: for i in range(5): try: http_client.get(hostname, '/ping') except requests.exceptions.ConnectionError: time.sleep(1) continue break assert i != 5, 'Timed out' yield parent_conn finally: proc.terminate() proc.join()