pyceo/tests/conftest.py

287 lines
7.1 KiB
Python

import importlib.resources
import os
import shutil
import subprocess
import tempfile
import flask
import ldap3
import pytest
import socket
from zope import component
from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \
IFileService, IMailmanService, IHTTPClient, IUWLDAPService, IMailService
from ceo_common.model import Config, RemoteMailmanService, HTTPClient
from ceod.api import create_app
from ceod.model import KerberosService, LDAPService, FileService, User, \
MailmanService, Group, UWLDAPService, UWLDAPRecord, MailService
from ceod.model.utils import strings_to_bytes
from .MockSMTPServer import MockSMTPServer
from .MockMailmanServer import MockMailmanServer
from .conftest_ceod_api import *
@pytest.fixture(scope='session')
def cfg():
with importlib.resources.path('tests', 'ceod_test_local.ini') as p:
config_file = p.__fspath__()
_cfg = Config(config_file)
component.provideUtility(_cfg, IConfig)
return _cfg
@pytest.fixture(scope='session')
def krb_srv(cfg):
# TODO: create temporary Kerberos database using kdb5_util.
# We need to be root to read the keytab
assert os.geteuid() == 0
# this dance again... ugh
if socket.gethostname() == cfg.get('ceod_admin_host'):
principal = 'ceod/admin'
else:
principal = 'ceod/' + socket.getfqdn()
cache_dir = cfg.get('ceod_krb5_cache_dir')
krb = KerberosService(principal)
component.provideUtility(krb, IKerberosService)
yield krb
shutil.rmtree(cache_dir)
def delete_subtree(conn: ldap3.Connection, base_dn: str):
try:
conn.search(base_dn, '(objectClass=*)', search_scope=ldap3.LEVEL)
for entry in conn.entries:
conn.delete(entry.entry_dn)
conn.delete(base_dn)
except ldap3.core.exceptions.LDAPNoSuchObjectResult:
pass
@pytest.fixture(scope='session')
def ceod_admin_creds(cfg, krb_srv):
"""
Acquire credentials for ceod/admin and store them
in the default ccache.
"""
subprocess.run(
['kinit', '-k', cfg.get('ldap_admin_principal')],
check=True,
)
@pytest.fixture(scope='session')
def ldap_conn(cfg, ceod_admin_creds) -> ldap3.Connection:
# Assume that the same server URL is being used for the CSC
# and UWLDAP during the tests.
cfg = component.getUtility(IConfig)
server_url = cfg.get('ldap_server_url')
# sanity check
assert server_url == cfg.get('uwldap_server_url')
return ldap3.Connection(
server_url, auto_bind=True, raise_exceptions=True,
authentication=ldap3.SASL, sasl_mechanism=ldap3.KERBEROS,
user=cfg.get('ldap_admin_principal'))
@pytest.fixture(scope='session')
def ldap_srv(cfg, krb_srv, ldap_conn):
conn = ldap_conn
users_base = cfg.get('ldap_users_base')
groups_base = cfg.get('ldap_groups_base')
delete_subtree(conn, users_base)
delete_subtree(conn, groups_base)
for base_dn in [users_base, groups_base]:
ou = base_dn.split(',', 1)[0].split('=')[1]
conn.add(base_dn, 'organizationalUnit')
_ldap_srv = LDAPService()
component.provideUtility(_ldap_srv, ILDAPService)
yield _ldap_srv
delete_subtree(conn, users_base)
delete_subtree(conn, groups_base)
@pytest.fixture(scope='session')
def file_srv(cfg):
_file_srv = FileService()
component.provideUtility(_file_srv, IFileService)
members_home = cfg.get('members_home')
clubs_home = cfg.get('clubs_home')
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
yield _file_srv
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
@pytest.fixture(scope='session')
def http_client(cfg):
client = HTTPClient()
component.provideUtility(client, IHTTPClient)
return
@pytest.fixture(scope='session')
def mock_mailman_server():
server = MockMailmanServer()
server.start()
yield server
server.stop()
@pytest.fixture(scope='session')
def mailman_srv(mock_mailman_server, cfg, http_client):
# TODO: test the RemoteMailmanService as well
mailman = MailmanService()
component.provideUtility(mailman, IMailmanService)
return mailman
@pytest.fixture(scope='session')
def uwldap_srv(cfg, ldap_conn):
conn = ldap_conn
base_dn = cfg.get('uwldap_base')
ou = base_dn.split(',', 1)[0].split('=')[1]
delete_subtree(conn, base_dn)
conn.add(base_dn, 'organizationalUnit')
_uwldap_srv = UWLDAPService()
component.provideUtility(_uwldap_srv, IUWLDAPService)
yield _uwldap_srv
delete_subtree(conn, base_dn)
@pytest.fixture(scope='session')
def mock_mail_server():
mock_server = MockSMTPServer()
mock_server.start()
yield mock_server
mock_server.stop()
@pytest.fixture(scope='session')
def mail_srv(cfg, mock_mail_server):
_mail_srv = MailService()
component.provideUtility(_mail_srv, IMailService)
return _mail_srv
@pytest.fixture(autouse=True, scope='session')
def app(
cfg,
krb_srv,
ldap_srv,
file_srv,
mailman_srv,
uwldap_srv,
mail_srv,
):
# need to be root to read keytab
assert os.geteuid() == 0
app = create_app({
'TESTING': True,
})
return app
@pytest.fixture
def g_admin(cfg, ceod_admin_creds, app):
"""
Store the creds for ceod/admin in flask.g.
This fixture should be used any time LDAP is modified via the LDAPService.
"""
admin_principal = cfg.get('ldap_admin_principal')
with app.app_context():
try:
flask.g.sasl_user = admin_principal
yield
finally:
flask.g.pop('sasl_user')
@pytest.fixture
def simple_user():
return User(
uid='test_jdoe',
cn='John Doe',
program='Math',
terms=['s2021'],
)
@pytest.fixture
def simple_club():
return User(
uid='test_club1',
cn='Club One',
is_club=True,
)
@pytest.fixture
def ldap_user(simple_user, g_admin):
simple_user.add_to_ldap()
yield simple_user
simple_user.remove_from_ldap()
@pytest.fixture
def krb_user(simple_user):
simple_user.add_to_kerberos('krb5')
yield simple_user
simple_user.remove_from_kerberos()
@pytest.fixture
def simple_group():
return Group(
cn='group1',
gid_number=21000,
)
@pytest.fixture
def ldap_group(simple_group, g_admin):
simple_group.add_to_ldap()
yield simple_group
simple_group.remove_from_ldap()
@pytest.fixture
def uwldap_user(cfg, uwldap_srv, ldap_conn):
conn = ldap_conn
base_dn = cfg.get('uwldap_base')
user = UWLDAPRecord(
uid='test_jdoe',
mail_local_addresses=['test_jdoe@uwaterloo.internal'],
program='Math',
cn='John Doe',
sn='Doe',
given_name='John',
)
dn = f'uid={user.uid},{base_dn}'
conn.add(
dn,
[
'inetLocalMailRecipient',
'inetOrgPerson',
'organizationalPerson',
'person',
],
{
'mailLocalAddress': user.mail_local_addresses,
'ou': user.program,
'cn': user.cn,
'sn': user.sn,
'givenName': user.given_name,
},
)
yield user
conn.delete(dn)