pyceo/tests/conftest.py

378 lines
10 KiB
Python

import contextlib
import grp
import importlib.resources
import os
import pwd
import shutil
import subprocess
from subprocess import DEVNULL
import tempfile
from unittest.mock import patch, Mock
import flask
import ldap3
import pytest
import socket
from zope import component
from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \
IFileService, IMailmanService, IHTTPClient, IUWLDAPService, IMailService
from ceo_common.model import Config, HTTPClient
from ceod.api import create_app
from ceod.model import KerberosService, LDAPService, FileService, User, \
MailmanService, Group, UWLDAPService, UWLDAPRecord, MailService
import ceod.utils as utils
from .MockSMTPServer import MockSMTPServer
from .MockMailmanServer import MockMailmanServer
from .conftest_ceod_api import client # noqa: F401
@pytest.fixture(scope='session', autouse=True)
def _drone_hostname_mock():
# Drone doesn't appear to set the hostname of the container.
# Mock it instead.
if 'DRONE_STEP_NAME' in os.environ:
hostname = os.environ['DRONE_STEP_NAME']
fqdn = hostname + '.csclub.internal'
socket.gethostname = Mock(return_value=hostname)
socket.getfqdn = Mock(return_value=fqdn)
@pytest.fixture(scope='session')
def cfg(_drone_hostname_mock):
with importlib.resources.path('tests', 'ceod_test_local.ini') as p:
config_file = p.__fspath__()
_cfg = Config(config_file)
component.provideUtility(_cfg, IConfig)
return _cfg
def delete_test_princs(krb_srv):
proc = subprocess.run([
'kadmin', '-k', '-p', krb_srv.admin_principal, 'listprincs', 'test_*',
], text=True, capture_output=True, check=True)
princs = [line.strip() for line in proc.stdout.splitlines()]
for princ in princs:
krb_srv.delprinc(princ)
@pytest.fixture(scope='session')
def krb_srv(cfg):
# TODO: create temporary Kerberos database using kdb5_util.
# We need to be root to read the keytab
assert os.geteuid() == 0
# this dance again... ugh
if socket.gethostname() == cfg.get('ceod_admin_host'):
principal = 'ceod/admin'
else:
principal = 'ceod/' + socket.getfqdn()
cache_dir = cfg.get('ceod_krb5_cache_dir')
krb = KerberosService(principal)
component.provideUtility(krb, IKerberosService)
delete_test_princs(krb)
yield krb
delete_test_princs(krb)
shutil.rmtree(cache_dir)
def delete_subtree(conn: ldap3.Connection, base_dn: str):
try:
conn.search(base_dn, '(objectClass=*)', search_scope=ldap3.LEVEL)
for entry in conn.entries:
conn.delete(entry.entry_dn)
conn.delete(base_dn)
except ldap3.core.exceptions.LDAPNoSuchObjectResult:
pass
@pytest.fixture(scope='session')
def ceod_admin_creds(cfg, krb_srv):
"""
Acquire credentials for ceod/admin and store them
in the default ccache.
"""
subprocess.run(
['kinit', '-k', cfg.get('ldap_admin_principal')],
check=True,
)
@pytest.fixture
def g_admin_ctx(cfg, ceod_admin_creds, app):
"""
Store the principal for ceod/admin in flask.g.
This context manager should be used any time LDAP is modified via the
LDAPService, and we are not in a request context.
This should NOT be active when CeodTestClient is making a request, since
that will override the values in flask.g.
"""
@contextlib.contextmanager
def wrapper():
admin_principal = cfg.get('ldap_admin_principal')
with app.app_context():
try:
flask.g.sasl_user = admin_principal
yield
finally:
flask.g.pop('sasl_user')
return wrapper
@pytest.fixture(scope='session')
def syscom_creds():
"""
Acquire credentials for a syscom member and store them in a ccache.
Yields the name of the ccache file.
"""
with tempfile.NamedTemporaryFile() as f:
subprocess.run(
['kinit', '-c', f.name, 'ctdalek'],
check=True, text=True, input='krb5', stdout=DEVNULL,
)
yield f.name
@pytest.fixture
def g_syscom(syscom_creds, app):
"""
Store the principal for the syscom member in flask.g, and point
KRB5CCNAME to the file where the TGT is stored.
Use this fixture if you need syscom credentials for an HTTP request
to a different process.
"""
filename = syscom_creds
with app.app_context():
old_krb5ccname = os.environ['KRB5CCNAME']
os.environ['KRB5CCNAME'] = 'FILE:' + filename
try:
flask.g.sasl_user = 'ctdalek'
yield filename
finally:
os.environ['KRB5CCNAME'] = old_krb5ccname
flask.g.pop('sasl_user')
@pytest.fixture(scope='session')
def ldap_conn(cfg, ceod_admin_creds) -> ldap3.Connection:
# Assume that the same server URL is being used for the CSC
# and UWLDAP during the tests.
cfg = component.getUtility(IConfig)
server_url = cfg.get('ldap_server_url')
# sanity check
assert server_url == cfg.get('uwldap_server_url')
return ldap3.Connection(
server_url, auto_bind=True, raise_exceptions=True,
authentication=ldap3.SASL, sasl_mechanism=ldap3.KERBEROS,
user=cfg.get('ldap_admin_principal'))
@pytest.fixture(scope='session')
def ldap_srv_session(cfg, krb_srv, ldap_conn):
conn = ldap_conn
users_base = cfg.get('ldap_users_base')
groups_base = cfg.get('ldap_groups_base')
sudo_base = cfg.get('ldap_sudo_base')
for base_dn in [users_base, groups_base, sudo_base]:
delete_subtree(conn, base_dn)
conn.add(base_dn, 'organizationalUnit')
_ldap_srv = LDAPService()
component.provideUtility(_ldap_srv, ILDAPService)
yield _ldap_srv
for base_dn in [users_base, groups_base, sudo_base]:
delete_subtree(conn, base_dn)
@pytest.fixture
def ldap_srv(ldap_srv_session, g_admin_ctx):
# This is an ugly hack to get around the fact that function-scoped
# fixtures (g_admin_ctx) can't be used from session-scoped fixtures
# (ldap_srv_session).
with g_admin_ctx():
yield ldap_srv_session
@pytest.fixture(scope='session')
def file_srv(cfg):
_file_srv = FileService()
component.provideUtility(_file_srv, IFileService)
members_home = cfg.get('members_home')
clubs_home = cfg.get('clubs_home')
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
yield _file_srv
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
@pytest.fixture(scope='session')
def http_client(cfg):
_client = HTTPClient()
component.provideUtility(_client, IHTTPClient)
return _client
@pytest.fixture(scope='session')
def mock_mailman_server():
server = MockMailmanServer()
server.start()
yield server
server.stop()
@pytest.fixture(scope='session')
def mailman_srv(mock_mailman_server, cfg, http_client):
# TODO: test the RemoteMailmanService as well
mailman = MailmanService()
component.provideUtility(mailman, IMailmanService)
return mailman
@pytest.fixture(scope='session')
def uwldap_srv(cfg, ldap_conn):
conn = ldap_conn
base_dn = cfg.get('uwldap_base')
delete_subtree(conn, base_dn)
conn.add(base_dn, 'organizationalUnit')
_uwldap_srv = UWLDAPService()
component.provideUtility(_uwldap_srv, IUWLDAPService)
yield _uwldap_srv
delete_subtree(conn, base_dn)
@pytest.fixture(scope='session')
def mock_mail_server():
mock_server = MockSMTPServer()
mock_server.start()
yield mock_server
mock_server.stop()
@pytest.fixture(scope='session')
def mail_srv(cfg, mock_mail_server):
_mail_srv = MailService()
component.provideUtility(_mail_srv, IMailService)
return _mail_srv
@pytest.fixture(autouse=True, scope='session')
def app(
cfg,
krb_srv,
ldap_srv_session,
file_srv,
mailman_srv,
uwldap_srv,
mail_srv,
):
app = create_app({'TESTING': True})
return app
@pytest.fixture(scope='session')
def mocks_for_create_user():
with patch.object(utils, 'gen_password') as gen_password_mock, \
patch.object(pwd, 'getpwuid') as getpwuid_mock, \
patch.object(grp, 'getgrgid') as getgrgid_mock:
gen_password_mock.return_value = 'krb5'
# Normally, if getpwuid or getgrgid do *not* raise a KeyError,
# then LDAPService will skip that UID. Therefore, by raising a
# KeyError, we are making sure that the UID will *not* be skipped.
getpwuid_mock.side_effect = KeyError()
getgrgid_mock.side_effect = KeyError()
yield
@pytest.fixture
def simple_user():
return User(
uid='test_jdoe',
cn='John Doe',
program='Math',
terms=['s2021'],
)
@pytest.fixture
def simple_club():
return User(
uid='test_club1',
cn='Club One',
is_club=True,
)
@pytest.fixture
def ldap_user(simple_user, g_admin_ctx):
with g_admin_ctx():
simple_user.add_to_ldap()
yield simple_user
with g_admin_ctx():
simple_user.remove_from_ldap()
@pytest.fixture
def krb_user(simple_user):
simple_user.add_to_kerberos('krb5')
yield simple_user
simple_user.remove_from_kerberos()
@pytest.fixture
def simple_group():
return Group(
cn='group1',
gid_number=21000,
user_cn='Group One',
)
@pytest.fixture
def ldap_group(simple_group, g_admin_ctx):
with g_admin_ctx():
simple_group.add_to_ldap()
yield simple_group
with g_admin_ctx():
simple_group.remove_from_ldap()
@pytest.fixture
def uwldap_user(cfg, uwldap_srv, ldap_conn):
conn = ldap_conn
base_dn = cfg.get('uwldap_base')
user = UWLDAPRecord(
uid='test_jdoe',
mail_local_addresses=['test_jdoe@uwaterloo.internal'],
program='Math',
cn='John Doe',
sn='Doe',
given_name='John',
)
dn = f'uid={user.uid},{base_dn}'
conn.add(
dn,
[
'inetLocalMailRecipient',
'inetOrgPerson',
'organizationalPerson',
'person',
],
{
'mailLocalAddress': user.mail_local_addresses,
'ou': user.program,
'cn': user.cn,
'sn': user.sn,
'givenName': user.given_name,
},
)
yield user
conn.delete(dn)