pyceo/tests/conftest.py

602 lines
17 KiB
Python

import contextlib
import importlib.resources
import multiprocessing
from multiprocessing import Process
import os
import shutil
import subprocess
from subprocess import DEVNULL
import sys
import tempfile
import threading
import time
from unittest.mock import Mock
import flask
import gssapi
import ldap3
import pytest
import requests
import socket
from zope import component
# noqa: F401
from .utils import ( # noqa: F401
gssapi_token_ctx,
ccache_cleanup,
mocks_for_create_user_ctx,
reset_disable_club_conf,
)
from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \
IFileService, IMailmanService, IHTTPClient, IUWLDAPService, IMailService, \
IDatabaseService, ICloudStackService, IKubernetesService, IVHostManager, \
ICloudResourceManager, IContainerRegistryService, IClubWebHostingService
from ceo_common.model import Config, HTTPClient, Term
import ceo_common.utils
from ceod.api import create_app
from ceod.db import MySQLService, PostgreSQLService
from ceod.model import KerberosService, LDAPService, FileService, User, \
MailmanService, Group, UWLDAPService, UWLDAPRecord, MailService, \
CloudStackService, KubernetesService, VHostManager, CloudResourceManager, \
ContainerRegistryService, ClubWebHostingService
from .MockSMTPServer import MockSMTPServer
from .MockMailmanServer import MockMailmanServer
from .MockCloudStackServer import MockCloudStackServer
from .MockHarborServer import MockHarborServer
from .conftest_ceod_api import client # noqa: F401
from .conftest_ceo import cli_setup # noqa: F401
@pytest.fixture(scope='session', autouse=True)
def _drone_hostname_mock():
# Drone doesn't appear to set the hostname of the container.
# Mock it instead.
if 'DRONE_STEP_NAME' in os.environ:
hostname = os.environ['DRONE_STEP_NAME']
fqdn = hostname + '.csclub.internal'
socket.gethostname = Mock(return_value=hostname)
socket.getfqdn = Mock(return_value=fqdn)
@pytest.fixture(scope='session')
def cfg(_drone_hostname_mock):
with importlib.resources.path('tests', 'ceod_test_local.ini') as p:
config_file = p.__fspath__()
_cfg = Config(config_file)
component.getGlobalSiteManager().registerUtility(_cfg, IConfig)
return _cfg
def delete_test_princs(krb_srv):
proc = subprocess.run([
'kadmin', '-k', '-p', krb_srv.admin_principal, 'listprincs', 'test*',
], text=True, capture_output=True, check=True)
princs = [line.strip() for line in proc.stdout.splitlines()]
for princ in princs:
krb_srv.delprinc(princ)
@pytest.fixture(scope='session')
def krb_srv(cfg):
# TODO: create temporary Kerberos database using kdb5_util.
# We need to be root to read the keytab
assert os.geteuid() == 0
# Delete the admin creds file, if it exists
ccache_file = cfg.get('ldap_admin_principal_ccache')
if os.path.isfile(ccache_file):
os.unlink(ccache_file)
krb = KerberosService()
component.getGlobalSiteManager().registerUtility(krb, IKerberosService)
delete_test_princs(krb)
yield krb
delete_test_princs(krb)
def delete_subtree(conn: ldap3.Connection, base_dn: str):
try:
conn.search(base_dn, '(objectClass=*)', search_scope=ldap3.LEVEL)
for entry in conn.entries:
conn.delete(entry.entry_dn)
conn.delete(base_dn)
except ldap3.core.exceptions.LDAPNoSuchObjectResult:
pass
@pytest.fixture
def g_admin_ctx(app):
"""
Store the credentials for ceod/admin in flask.g, and override KBR5CCNAME.
This context manager should be used any time LDAP is modified via the
LDAPService, and we are not in a request context.
This should NOT be active when CeodTestClient is making a request, since
that will override the values in flask.g.
"""
@contextlib.contextmanager
def wrapper():
with gssapi_token_ctx('ceod/admin') as token, app.app_context():
try:
flask.g.auth_user = 'ceod/admin'
flask.g.client_token = token
yield
finally:
flask.g.pop('client_token')
flask.g.pop('auth_user')
return wrapper
@pytest.fixture
def g_syscom(app):
"""
Store the credentials for ctdalek in flask.g and override KRB5CCNAME.
Use this fixture if you need syscom credentials for an HTTP request
to a different process.
"""
with gssapi_token_ctx('ctdalek') as token, app.app_context():
try:
flask.g.auth_user = 'ctdalek'
flask.g.client_token = token
yield
finally:
flask.g.pop('client_token')
flask.g.pop('auth_user')
@pytest.fixture(scope='session')
def ldap_conn(cfg) -> ldap3.Connection:
# Assume that the same server URL is being used for the CSC
# and UWLDAP during the tests.
cfg = component.getUtility(IConfig)
server_url = cfg.get('ldap_server_url')
# sanity check
assert server_url == cfg.get('uwldap_server_url')
with gssapi_token_ctx('ceod/admin') as token:
creds = gssapi.Credentials(token=token)
conn = ldap3.Connection(
server_url, auto_bind=True, raise_exceptions=True,
authentication=ldap3.SASL, sasl_mechanism=ldap3.KERBEROS,
sasl_credentials=(None, None, creds))
return conn
@pytest.fixture(scope='session')
def ldap_srv_session(cfg, krb_srv, ldap_conn):
conn = ldap_conn
users_base = cfg.get('ldap_users_base')
groups_base = cfg.get('ldap_groups_base')
sudo_base = cfg.get('ldap_sudo_base')
for base_dn in [users_base, groups_base, sudo_base]:
delete_subtree(conn, base_dn)
conn.add(base_dn, 'organizationalUnit')
_ldap_srv = LDAPService()
component.getGlobalSiteManager().registerUtility(_ldap_srv, ILDAPService)
yield _ldap_srv
for base_dn in [users_base, groups_base, sudo_base]:
delete_subtree(conn, base_dn)
@pytest.fixture
def ldap_srv(ldap_srv_session, g_admin_ctx):
# This is an ugly hack to get around the fact that function-scoped
# fixtures (g_admin_ctx) can't be used from session-scoped fixtures
# (ldap_srv_session).
with g_admin_ctx():
yield ldap_srv_session
@pytest.fixture(scope='session')
def file_srv(cfg):
_file_srv = FileService()
component.getGlobalSiteManager().registerUtility(_file_srv, IFileService)
members_home = cfg.get('members_home')
clubs_home = cfg.get('clubs_home')
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
yield _file_srv
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
@pytest.fixture(scope='session')
def http_client(cfg):
_client = HTTPClient()
component.getGlobalSiteManager().registerUtility(_client, IHTTPClient)
return _client
@pytest.fixture(scope='session')
def mock_mailman_server():
server = MockMailmanServer()
server.start()
yield server
server.stop()
@pytest.fixture(scope='session')
def mailman_srv(mock_mailman_server, cfg, http_client):
mailman = MailmanService()
component.getGlobalSiteManager().registerUtility(mailman, IMailmanService)
return mailman
@pytest.fixture(scope='session')
def uwldap_srv(cfg, ldap_conn):
conn = ldap_conn
base_dn = cfg.get('uwldap_base')
delete_subtree(conn, base_dn)
conn.add(base_dn, 'organizationalUnit')
_uwldap_srv = UWLDAPService()
component.getGlobalSiteManager().registerUtility(_uwldap_srv, IUWLDAPService)
yield _uwldap_srv
delete_subtree(conn, base_dn)
@pytest.fixture(scope='session')
def webhosting_srv():
with tempfile.TemporaryDirectory() as tmpdir:
srv = ClubWebHostingService(tmpdir)
component.getGlobalSiteManager().registerUtility(srv, IClubWebHostingService)
yield srv
@pytest.fixture(scope='session')
def mock_mail_server():
mock_server = MockSMTPServer()
mock_server.start()
yield mock_server
mock_server.stop()
@pytest.fixture(scope='session')
def mail_srv(cfg, mock_mail_server):
_mail_srv = MailService()
component.getGlobalSiteManager().registerUtility(_mail_srv, IMailService)
return _mail_srv
@pytest.fixture(scope='session')
def mock_cloud_server():
mock_server = MockCloudStackServer()
mock_server.start()
yield mock_server
mock_server.stop()
@pytest.fixture(scope='session')
def mock_harbor_server():
mock_server = MockHarborServer()
mock_server.start()
yield mock_server
mock_server.stop()
@pytest.fixture(scope='session')
def registry_srv(cfg):
reg_srv = ContainerRegistryService()
component.getGlobalSiteManager().registerUtility(reg_srv, IContainerRegistryService)
return reg_srv
@pytest.fixture(scope='session')
def mysql_srv(cfg):
mysql_srv = MySQLService()
component.getGlobalSiteManager().registerUtility(mysql_srv, IDatabaseService, 'mysql')
return mysql_srv
@pytest.fixture(scope='session')
def postgresql_srv(cfg):
psql_srv = PostgreSQLService()
component.getGlobalSiteManager().registerUtility(psql_srv, IDatabaseService, 'postgresql')
return psql_srv
@pytest.fixture(scope='session')
def vhost_dir_setup(cfg):
state_dir = '/run/ceod'
if os.path.isdir(state_dir):
shutil.rmtree(state_dir)
os.makedirs(state_dir)
yield
shutil.rmtree(state_dir)
@pytest.fixture(scope='session')
def vhost_mgr(cfg, vhost_dir_setup):
mgr = VHostManager()
component.getGlobalSiteManager().registerUtility(mgr, IVHostManager)
return mgr
@pytest.fixture(scope='session')
def cloudstack_srv(cfg):
srv = CloudStackService()
component.getGlobalSiteManager().registerUtility(srv, ICloudStackService)
return srv
@pytest.fixture(scope='session')
def k8s_srv(cfg, vhost_dir_setup):
srv = KubernetesService()
component.getGlobalSiteManager().registerUtility(srv, IKubernetesService)
return srv
@pytest.fixture(scope='session')
def cloud_mgr(cfg):
mgr = CloudResourceManager()
component.getGlobalSiteManager().registerUtility(mgr, ICloudResourceManager)
return mgr
@pytest.fixture(autouse=True, scope='session')
def app(
cfg,
krb_srv,
ldap_srv_session,
file_srv,
mailman_srv,
uwldap_srv,
mail_srv,
mysql_srv,
postgresql_srv,
cloudstack_srv,
vhost_mgr,
k8s_srv,
registry_srv,
cloud_mgr,
webhosting_srv,
):
app = create_app({'TESTING': True})
return app
@pytest.fixture
def mocks_for_create_user():
with mocks_for_create_user_ctx():
yield
@pytest.fixture(scope='module')
def mocks_for_create_user_module():
with mocks_for_create_user_ctx():
yield
@pytest.fixture
def simple_user():
return User(
uid='test_jdoe',
cn='John Doe',
given_name='John',
sn='Doe',
program='Math',
terms=[str(Term.current())],
)
@pytest.fixture
def simple_club():
return User(
uid='test_club1',
cn='Club One',
is_club=True,
)
@pytest.fixture
def ldap_user(simple_user, g_admin_ctx):
with g_admin_ctx():
simple_user.add_to_ldap()
yield simple_user
with g_admin_ctx():
simple_user.remove_from_ldap()
@pytest.fixture
def krb_user(simple_user):
# We don't want to use add_to_kerberos() here because that expires the
# user's password, which we don't want for testing
subprocess.run(
['kadmin', '-k', '-p', 'ceod/admin', 'addprinc', '-pw', 'krb5',
simple_user.uid], stdout=DEVNULL, check=True)
yield simple_user
simple_user.remove_from_kerberos()
@pytest.fixture
def new_user_gen(
client, g_admin_ctx, ldap_srv_session, mocks_for_create_user, # noqa: F811
):
_new_user_id_counter = 11001
@contextlib.contextmanager
def wrapper():
nonlocal _new_user_id_counter
uid = 'test' + str(_new_user_id_counter)
_new_user_id_counter += 1
status, data = client.post('/api/members', json={
'uid': uid,
'cn': 'John Doe',
'given_name': 'John',
'sn': 'Doe',
'program': 'Math',
'terms': [str(Term.current())],
})
assert status == 200
assert data[-1]['status'] == 'completed'
subprocess.run([
'kadmin', '-k', '-p', 'ceod/admin',
'modprinc', '-needchange', uid,
], check=True)
with g_admin_ctx():
user = ldap_srv_session.get_user(uid)
yield user
status, data = client.delete(f'/api/members/{uid}')
assert status == 200
assert data[-1]['status'] == 'completed'
return wrapper
@pytest.fixture
def new_user(new_user_gen):
with new_user_gen() as user:
yield user
@pytest.fixture
def new_club_gen(client, g_admin_ctx, ldap_srv_session): # noqa: F811
new_club_id_counter = 31001
@contextlib.contextmanager
def wrapper():
nonlocal new_club_id_counter
cn = 'test' + str(new_club_id_counter)
new_club_id_counter += 1
status, data = client.post('/api/groups', json={
'cn': cn,
'description': 'Test ' + str(new_club_id_counter),
})
assert status == 200
assert data[-1]['status'] == 'completed'
with g_admin_ctx():
group = ldap_srv_session.get_group(cn)
yield group
status, data = client.delete(f'/api/groups/{cn}')
assert status == 200
assert data[-1]['status'] == 'completed'
return wrapper
@pytest.fixture
def simple_group():
return Group(
cn='group1',
gid_number=21000,
user_cn='Group One',
)
@pytest.fixture
def ldap_group(simple_group, g_admin_ctx):
with g_admin_ctx():
simple_group.add_to_ldap()
yield simple_group
with g_admin_ctx():
simple_group.remove_from_ldap()
@pytest.fixture
def syscom_group(g_admin_ctx):
group = Group(
cn='syscom',
gid_number=10001,
user_cn='Systems Committee'
)
with g_admin_ctx():
group.add_to_ldap()
yield group
with g_admin_ctx():
group.remove_from_ldap()
@pytest.fixture
def uwldap_user(cfg, uwldap_srv, ldap_conn):
conn = ldap_conn
base_dn = cfg.get('uwldap_base')
user = UWLDAPRecord(
uid='test_jdoe',
mail_local_addresses=['test_jdoe@uwaterloo.internal'],
program='Math',
cn='John Doe',
sn='Doe',
given_name='John',
)
dn = f'uid={user.uid},{base_dn}'
conn.add(
dn,
[
'inetLocalMailRecipient',
'inetOrgPerson',
'organizationalPerson',
'person',
],
{
'mailLocalAddress': user.mail_local_addresses,
'ou': user.program,
'cn': user.cn,
'sn': user.sn,
'givenName': user.given_name,
},
)
yield user
conn.delete(dn)
@pytest.fixture
def webhosting_srv_resources(webhosting_srv):
os.makedirs(webhosting_srv.conf_available_dir, exist_ok=True)
os.makedirs(webhosting_srv.sites_available_dir, exist_ok=True)
reset_disable_club_conf(webhosting_srv)
for file in os.listdir(webhosting_srv.sites_available_dir):
os.unlink(os.path.join(webhosting_srv.sites_available_dir, file))
if os.path.isdir(webhosting_srv.apache_dir + '/.git'):
shutil.rmtree(webhosting_srv.apache_dir + '/.git')
@pytest.fixture(scope='module')
def app_process(cfg, app, http_client):
port = cfg.get('ceod_port')
hostname = socket.gethostname()
# The parent process may want to mock the datetime
# function in the child process, so we need IPC
mock_datetime_value = None
orig_get_datetime = ceo_common.utils.get_current_datetime
def mock_get_datetime():
if mock_datetime_value is not None:
return mock_datetime_value
else:
return orig_get_datetime()
def ipc_thread_start(pipe):
nonlocal mock_datetime_value
ceo_common.utils.get_current_datetime = mock_get_datetime
while True:
mock_datetime_value = pipe.recv()
pipe.send(None) # ACK
def server_start(pipe):
sys.stdout = open('/dev/null', 'w')
sys.stderr = sys.stdout
threading.Thread(target=ipc_thread_start, args=(pipe,)).start()
app.run(debug=False, host='0.0.0.0', port=port)
parent_conn, child_conn = multiprocessing.Pipe()
proc = Process(target=server_start, args=(child_conn,))
proc.start()
try:
for i in range(5):
try:
http_client.get(hostname, '/ping')
except requests.exceptions.ConnectionError:
time.sleep(1)
continue
break
assert i != 5, 'Timed out'
yield parent_conn
finally:
proc.terminate()
proc.join()