pyceo/tests/conftest.py

305 lines
7.9 KiB
Python

import grp
import importlib.resources
import os
import pwd
import shutil
import subprocess
import tempfile
from unittest.mock import patch
import flask
import ldap3
import pytest
import socket
from zope import component
from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \
IFileService, IMailmanService, IHTTPClient, IUWLDAPService, IMailService
from ceo_common.model import Config, RemoteMailmanService, HTTPClient
from ceod.api import create_app
from ceod.model import KerberosService, LDAPService, FileService, User, \
MailmanService, Group, UWLDAPService, UWLDAPRecord, MailService
from ceod.model.utils import strings_to_bytes
import ceod.utils as utils
from .MockSMTPServer import MockSMTPServer
from .MockMailmanServer import MockMailmanServer
from .conftest_ceod_api import client
@pytest.fixture(scope='session')
def cfg():
with importlib.resources.path('tests', 'ceod_test_local.ini') as p:
config_file = p.__fspath__()
_cfg = Config(config_file)
component.provideUtility(_cfg, IConfig)
return _cfg
@pytest.fixture(scope='session')
def krb_srv(cfg):
# TODO: create temporary Kerberos database using kdb5_util.
# We need to be root to read the keytab
assert os.geteuid() == 0
# this dance again... ugh
if socket.gethostname() == cfg.get('ceod_admin_host'):
principal = 'ceod/admin'
else:
principal = 'ceod/' + socket.getfqdn()
cache_dir = cfg.get('ceod_krb5_cache_dir')
krb = KerberosService(principal)
component.provideUtility(krb, IKerberosService)
yield krb
shutil.rmtree(cache_dir)
def delete_subtree(conn: ldap3.Connection, base_dn: str):
try:
conn.search(base_dn, '(objectClass=*)', search_scope=ldap3.LEVEL)
for entry in conn.entries:
conn.delete(entry.entry_dn)
conn.delete(base_dn)
except ldap3.core.exceptions.LDAPNoSuchObjectResult:
pass
@pytest.fixture(scope='session')
def ceod_admin_creds(cfg, krb_srv):
"""
Acquire credentials for ceod/admin and store them
in the default ccache.
"""
subprocess.run(
['kinit', '-k', cfg.get('ldap_admin_principal')],
check=True,
)
@pytest.fixture(scope='session')
def ldap_conn(cfg, ceod_admin_creds) -> ldap3.Connection:
# Assume that the same server URL is being used for the CSC
# and UWLDAP during the tests.
cfg = component.getUtility(IConfig)
server_url = cfg.get('ldap_server_url')
# sanity check
assert server_url == cfg.get('uwldap_server_url')
return ldap3.Connection(
server_url, auto_bind=True, raise_exceptions=True,
authentication=ldap3.SASL, sasl_mechanism=ldap3.KERBEROS,
user=cfg.get('ldap_admin_principal'))
@pytest.fixture(scope='session')
def ldap_srv(cfg, krb_srv, ldap_conn):
conn = ldap_conn
users_base = cfg.get('ldap_users_base')
groups_base = cfg.get('ldap_groups_base')
sudo_base = cfg.get('ldap_sudo_base')
for base_dn in [users_base, groups_base, sudo_base]:
delete_subtree(conn, base_dn)
ou = base_dn.split(',', 1)[0].split('=')[1]
conn.add(base_dn, 'organizationalUnit')
_ldap_srv = LDAPService()
component.provideUtility(_ldap_srv, ILDAPService)
yield _ldap_srv
for base_dn in [users_base, groups_base, sudo_base]:
delete_subtree(conn, base_dn)
@pytest.fixture(scope='session')
def file_srv(cfg):
_file_srv = FileService()
component.provideUtility(_file_srv, IFileService)
members_home = cfg.get('members_home')
clubs_home = cfg.get('clubs_home')
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
yield _file_srv
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
@pytest.fixture(scope='session')
def http_client(cfg):
client = HTTPClient()
component.provideUtility(client, IHTTPClient)
return
@pytest.fixture(scope='session')
def mock_mailman_server():
server = MockMailmanServer()
server.start()
yield server
server.stop()
@pytest.fixture(scope='session')
def mailman_srv(mock_mailman_server, cfg, http_client):
# TODO: test the RemoteMailmanService as well
mailman = MailmanService()
component.provideUtility(mailman, IMailmanService)
return mailman
@pytest.fixture(scope='session')
def uwldap_srv(cfg, ldap_conn):
conn = ldap_conn
base_dn = cfg.get('uwldap_base')
ou = base_dn.split(',', 1)[0].split('=')[1]
delete_subtree(conn, base_dn)
conn.add(base_dn, 'organizationalUnit')
_uwldap_srv = UWLDAPService()
component.provideUtility(_uwldap_srv, IUWLDAPService)
yield _uwldap_srv
delete_subtree(conn, base_dn)
@pytest.fixture(scope='session')
def mock_mail_server():
mock_server = MockSMTPServer()
mock_server.start()
yield mock_server
mock_server.stop()
@pytest.fixture(scope='session')
def mail_srv(cfg, mock_mail_server):
_mail_srv = MailService()
component.provideUtility(_mail_srv, IMailService)
return _mail_srv
@pytest.fixture(autouse=True, scope='session')
def app(
cfg,
krb_srv,
ldap_srv,
file_srv,
mailman_srv,
uwldap_srv,
mail_srv,
):
# need to be root to read keytab
assert os.geteuid() == 0
app = create_app({
'TESTING': True,
})
return app
@pytest.fixture
def g_admin(cfg, ceod_admin_creds, app):
"""
Store the creds for ceod/admin in flask.g.
This fixture should be used any time LDAP is modified via the LDAPService.
"""
admin_principal = cfg.get('ldap_admin_principal')
with app.test_request_context():
try:
flask.g.sasl_user = admin_principal
yield
finally:
flask.g.pop('sasl_user')
@pytest.fixture(scope='session')
def mocks_for_create_user():
with patch.object(utils, 'gen_password') as gen_password_mock, \
patch.object(pwd, 'getpwuid') as getpwuid_mock, \
patch.object(grp, 'getgrgid') as getgrgid_mock:
gen_password_mock.return_value = 'krb5'
# Normally, if getpwuid or getgrgid do *not* raise a KeyError,
# then LDAPService will skip that UID. Therefore, by raising a
# KeyError, we are making sure that the UID will *not* be skipped.
getpwuid_mock.side_effect = KeyError()
getgrgid_mock.side_effect = KeyError()
yield
@pytest.fixture
def simple_user():
return User(
uid='test_jdoe',
cn='John Doe',
program='Math',
terms=['s2021'],
)
@pytest.fixture
def simple_club():
return User(
uid='test_club1',
cn='Club One',
is_club=True,
)
@pytest.fixture
def ldap_user(simple_user, g_admin):
simple_user.add_to_ldap()
yield simple_user
simple_user.remove_from_ldap()
@pytest.fixture
def krb_user(simple_user):
simple_user.add_to_kerberos('krb5')
yield simple_user
simple_user.remove_from_kerberos()
@pytest.fixture
def simple_group():
return Group(
cn='group1',
gid_number=21000,
)
@pytest.fixture
def ldap_group(simple_group, g_admin):
simple_group.add_to_ldap()
yield simple_group
simple_group.remove_from_ldap()
@pytest.fixture
def uwldap_user(cfg, uwldap_srv, ldap_conn):
conn = ldap_conn
base_dn = cfg.get('uwldap_base')
user = UWLDAPRecord(
uid='test_jdoe',
mail_local_addresses=['test_jdoe@uwaterloo.internal'],
program='Math',
cn='John Doe',
sn='Doe',
given_name='John',
)
dn = f'uid={user.uid},{base_dn}'
conn.add(
dn,
[
'inetLocalMailRecipient',
'inetOrgPerson',
'organizationalPerson',
'person',
],
{
'mailLocalAddress': user.mail_local_addresses,
'ou': user.program,
'cn': user.cn,
'sn': user.sn,
'givenName': user.given_name,
},
)
yield user
conn.delete(dn)