pyceo/tests/conftest.py

407 lines
11 KiB
Python
Raw Normal View History

2021-08-19 12:14:41 -04:00
import contextlib
2021-08-18 19:48:17 -04:00
import grp
2021-08-04 01:54:21 -04:00
import importlib.resources
2021-08-23 19:01:24 -04:00
from multiprocessing import Process
2021-08-04 01:54:21 -04:00
import os
2021-08-18 19:48:17 -04:00
import pwd
2021-08-04 01:54:21 -04:00
import shutil
2021-08-17 21:59:24 -04:00
import subprocess
from subprocess import DEVNULL
2021-08-23 19:01:24 -04:00
import sys
import time
2021-08-21 02:27:33 -04:00
from unittest.mock import patch, Mock
2021-08-04 01:54:21 -04:00
2021-08-17 21:59:24 -04:00
import flask
2021-08-28 01:51:48 -04:00
import gssapi
2021-08-15 01:04:49 -04:00
import ldap3
2021-08-04 01:54:21 -04:00
import pytest
2021-08-23 19:01:24 -04:00
import requests
2021-08-04 01:54:21 -04:00
import socket
from zope import component
2021-08-28 01:51:48 -04:00
from .utils import gssapi_token_ctx, ccache_cleanup # noqa: F401
2021-08-04 01:54:21 -04:00
from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \
IFileService, IMailmanService, IHTTPClient, IUWLDAPService, IMailService, \
IDatabaseService
2021-08-19 18:08:48 -04:00
from ceo_common.model import Config, HTTPClient
2021-08-13 20:11:56 -04:00
from ceod.api import create_app
from ceod.db import MySQLService, PostgreSQLService
2021-08-04 01:54:21 -04:00
from ceod.model import KerberosService, LDAPService, FileService, User, \
2021-08-13 20:11:56 -04:00
MailmanService, Group, UWLDAPService, UWLDAPRecord, MailService
2021-08-18 19:48:17 -04:00
import ceod.utils as utils
2021-08-13 20:11:56 -04:00
from .MockSMTPServer import MockSMTPServer
from .MockMailmanServer import MockMailmanServer
2021-08-19 18:08:48 -04:00
from .conftest_ceod_api import client # noqa: F401
2021-08-23 19:01:24 -04:00
from .conftest_ceo import cli_setup # noqa: F401
2021-08-04 01:54:21 -04:00
2021-08-21 02:27:33 -04:00
@pytest.fixture(scope='session', autouse=True)
def _drone_hostname_mock():
# Drone doesn't appear to set the hostname of the container.
# Mock it instead.
if 'DRONE_STEP_NAME' in os.environ:
hostname = os.environ['DRONE_STEP_NAME']
fqdn = hostname + '.csclub.internal'
socket.gethostname = Mock(return_value=hostname)
socket.getfqdn = Mock(return_value=fqdn)
2021-08-13 20:11:56 -04:00
@pytest.fixture(scope='session')
2021-08-21 02:27:33 -04:00
def cfg(_drone_hostname_mock):
2021-08-13 20:11:56 -04:00
with importlib.resources.path('tests', 'ceod_test_local.ini') as p:
2021-08-04 01:54:21 -04:00
config_file = p.__fspath__()
_cfg = Config(config_file)
component.getGlobalSiteManager().registerUtility(_cfg, IConfig)
2021-08-04 01:54:21 -04:00
return _cfg
2021-08-22 00:36:19 -04:00
def delete_test_princs(krb_srv):
proc = subprocess.run([
'kadmin', '-k', '-p', krb_srv.admin_principal, 'listprincs', 'test_*',
], text=True, capture_output=True, check=True)
princs = [line.strip() for line in proc.stdout.splitlines()]
for princ in princs:
krb_srv.delprinc(princ)
2021-08-13 20:11:56 -04:00
@pytest.fixture(scope='session')
2021-08-04 01:54:21 -04:00
def krb_srv(cfg):
2021-08-17 21:59:24 -04:00
# TODO: create temporary Kerberos database using kdb5_util.
# We need to be root to read the keytab
2021-08-04 01:54:21 -04:00
assert os.geteuid() == 0
# this dance again... ugh
if socket.gethostname() == cfg.get('ceod_admin_host'):
principal = 'ceod/admin'
else:
principal = 'ceod/' + socket.getfqdn()
2021-08-17 21:59:24 -04:00
krb = KerberosService(principal)
component.getGlobalSiteManager().registerUtility(krb, IKerberosService)
2021-08-22 00:36:19 -04:00
delete_test_princs(krb)
2021-08-04 01:54:21 -04:00
yield krb
2021-08-22 00:36:19 -04:00
delete_test_princs(krb)
2021-08-04 01:54:21 -04:00
2021-08-15 01:04:49 -04:00
def delete_subtree(conn: ldap3.Connection, base_dn: str):
2021-08-04 01:54:21 -04:00
try:
2021-08-15 01:04:49 -04:00
conn.search(base_dn, '(objectClass=*)', search_scope=ldap3.LEVEL)
for entry in conn.entries:
conn.delete(entry.entry_dn)
conn.delete(base_dn)
except ldap3.core.exceptions.LDAPNoSuchObjectResult:
2021-08-04 01:54:21 -04:00
pass
2021-08-19 12:14:41 -04:00
@pytest.fixture
2021-08-23 19:01:24 -04:00
def g_admin_ctx(app):
2021-08-19 12:14:41 -04:00
"""
2021-08-25 22:19:18 -04:00
Store the credentials for ceod/admin in flask.g, and override KBR5CCNAME.
2021-08-19 12:14:41 -04:00
This context manager should be used any time LDAP is modified via the
LDAPService, and we are not in a request context.
This should NOT be active when CeodTestClient is making a request, since
that will override the values in flask.g.
"""
@contextlib.contextmanager
def wrapper():
2021-08-28 01:51:48 -04:00
with gssapi_token_ctx('ceod/admin') as token, app.app_context():
2021-08-19 12:14:41 -04:00
try:
2021-08-25 22:19:18 -04:00
flask.g.auth_user = 'ceod/admin'
2021-08-28 01:51:48 -04:00
flask.g.client_token = token
2021-08-19 12:14:41 -04:00
yield
finally:
2021-08-28 01:51:48 -04:00
flask.g.pop('client_token')
2021-08-25 22:19:18 -04:00
flask.g.pop('auth_user')
2021-08-19 12:14:41 -04:00
return wrapper
2021-08-19 18:08:48 -04:00
@pytest.fixture
2021-08-23 19:01:24 -04:00
def g_syscom(app):
2021-08-19 18:08:48 -04:00
"""
2021-08-25 22:19:18 -04:00
Store the credentials for ctdalek in flask.g and override KRB5CCNAME.
2021-08-19 18:08:48 -04:00
Use this fixture if you need syscom credentials for an HTTP request
to a different process.
"""
2021-08-28 01:51:48 -04:00
with gssapi_token_ctx('ctdalek') as token, app.app_context():
2021-08-19 18:08:48 -04:00
try:
flask.g.sasl_user = 'ctdalek'
2021-08-28 01:51:48 -04:00
flask.g.client_token = token
2021-08-23 19:01:24 -04:00
yield
2021-08-19 18:08:48 -04:00
finally:
2021-08-28 01:51:48 -04:00
flask.g.pop('client_token')
2021-08-19 18:08:48 -04:00
flask.g.pop('sasl_user')
2021-08-17 21:59:24 -04:00
@pytest.fixture(scope='session')
2021-08-23 19:01:24 -04:00
def ldap_conn(cfg) -> ldap3.Connection:
2021-08-17 21:59:24 -04:00
# Assume that the same server URL is being used for the CSC
# and UWLDAP during the tests.
cfg = component.getUtility(IConfig)
server_url = cfg.get('ldap_server_url')
# sanity check
assert server_url == cfg.get('uwldap_server_url')
2021-08-28 01:51:48 -04:00
with gssapi_token_ctx('ceod/admin') as token:
creds = gssapi.Credentials(token=token)
2021-08-23 19:01:24 -04:00
conn = ldap3.Connection(
server_url, auto_bind=True, raise_exceptions=True,
authentication=ldap3.SASL, sasl_mechanism=ldap3.KERBEROS,
2021-08-25 22:19:18 -04:00
sasl_credentials=(None, None, creds))
2021-08-23 19:01:24 -04:00
return conn
2021-08-15 01:04:49 -04:00
2021-08-13 20:11:56 -04:00
@pytest.fixture(scope='session')
2021-08-19 12:14:41 -04:00
def ldap_srv_session(cfg, krb_srv, ldap_conn):
2021-08-17 21:59:24 -04:00
conn = ldap_conn
2021-08-04 01:54:21 -04:00
users_base = cfg.get('ldap_users_base')
groups_base = cfg.get('ldap_groups_base')
2021-08-18 19:48:17 -04:00
sudo_base = cfg.get('ldap_sudo_base')
2021-08-04 01:54:21 -04:00
2021-08-18 19:48:17 -04:00
for base_dn in [users_base, groups_base, sudo_base]:
delete_subtree(conn, base_dn)
2021-08-15 01:04:49 -04:00
conn.add(base_dn, 'organizationalUnit')
2021-08-18 19:48:17 -04:00
2021-08-04 01:54:21 -04:00
_ldap_srv = LDAPService()
component.getGlobalSiteManager().registerUtility(_ldap_srv, ILDAPService)
2021-08-19 12:14:41 -04:00
2021-08-04 01:54:21 -04:00
yield _ldap_srv
2021-08-18 19:48:17 -04:00
for base_dn in [users_base, groups_base, sudo_base]:
delete_subtree(conn, base_dn)
2021-08-04 01:54:21 -04:00
2021-08-19 12:14:41 -04:00
@pytest.fixture
def ldap_srv(ldap_srv_session, g_admin_ctx):
# This is an ugly hack to get around the fact that function-scoped
# fixtures (g_admin_ctx) can't be used from session-scoped fixtures
# (ldap_srv_session).
with g_admin_ctx():
yield ldap_srv_session
2021-08-13 20:11:56 -04:00
@pytest.fixture(scope='session')
2021-08-04 01:54:21 -04:00
def file_srv(cfg):
_file_srv = FileService()
component.getGlobalSiteManager().registerUtility(_file_srv, IFileService)
2021-08-04 01:54:21 -04:00
members_home = cfg.get('members_home')
clubs_home = cfg.get('clubs_home')
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
yield _file_srv
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
2021-08-13 20:11:56 -04:00
@pytest.fixture(scope='session')
def http_client(cfg):
2021-08-19 18:08:48 -04:00
_client = HTTPClient()
component.getGlobalSiteManager().registerUtility(_client, IHTTPClient)
2021-08-19 18:08:48 -04:00
return _client
2021-08-13 20:11:56 -04:00
@pytest.fixture(scope='session')
def mock_mailman_server():
server = MockMailmanServer()
server.start()
yield server
server.stop()
@pytest.fixture(scope='session')
def mailman_srv(mock_mailman_server, cfg, http_client):
# TODO: test the RemoteMailmanService as well
mailman = MailmanService()
component.getGlobalSiteManager().registerUtility(mailman, IMailmanService)
2021-08-13 20:11:56 -04:00
return mailman
@pytest.fixture(scope='session')
2021-08-17 21:59:24 -04:00
def uwldap_srv(cfg, ldap_conn):
conn = ldap_conn
2021-08-13 20:11:56 -04:00
base_dn = cfg.get('uwldap_base')
2021-08-15 01:04:49 -04:00
delete_subtree(conn, base_dn)
2021-08-13 20:11:56 -04:00
2021-08-15 01:04:49 -04:00
conn.add(base_dn, 'organizationalUnit')
2021-08-13 20:11:56 -04:00
_uwldap_srv = UWLDAPService()
component.getGlobalSiteManager().registerUtility(_uwldap_srv, IUWLDAPService)
2021-08-13 20:11:56 -04:00
yield _uwldap_srv
2021-08-15 01:04:49 -04:00
delete_subtree(conn, base_dn)
2021-08-13 20:11:56 -04:00
@pytest.fixture(scope='session')
def mock_mail_server():
mock_server = MockSMTPServer()
mock_server.start()
yield mock_server
mock_server.stop()
@pytest.fixture(scope='session')
def mail_srv(cfg, mock_mail_server):
_mail_srv = MailService()
component.getGlobalSiteManager().registerUtility(_mail_srv, IMailService)
2021-08-13 20:11:56 -04:00
return _mail_srv
@pytest.fixture(scope='session')
def mysql_srv(cfg):
mysql_srv = MySQLService()
component.getGlobalSiteManager().registerUtility(mysql_srv, IDatabaseService, 'mysql')
return mysql_srv
@pytest.fixture(scope='session')
def postgresql_srv(cfg):
psql_srv = PostgreSQLService()
component.getGlobalSiteManager().registerUtility(psql_srv, IDatabaseService, 'postgresql')
return psql_srv
2021-08-13 20:11:56 -04:00
@pytest.fixture(autouse=True, scope='session')
def app(
cfg,
krb_srv,
2021-08-19 12:14:41 -04:00
ldap_srv_session,
2021-08-13 20:11:56 -04:00
file_srv,
mailman_srv,
uwldap_srv,
mail_srv,
mysql_srv,
postgresql_srv,
2021-08-13 20:11:56 -04:00
):
2021-08-19 12:14:41 -04:00
app = create_app({'TESTING': True})
2021-08-13 20:11:56 -04:00
return app
2021-08-18 19:48:17 -04:00
@pytest.fixture(scope='session')
def mocks_for_create_user():
with patch.object(utils, 'gen_password') as gen_password_mock, \
patch.object(pwd, 'getpwuid') as getpwuid_mock, \
patch.object(grp, 'getgrgid') as getgrgid_mock:
gen_password_mock.return_value = 'krb5'
# Normally, if getpwuid or getgrgid do *not* raise a KeyError,
# then LDAPService will skip that UID. Therefore, by raising a
# KeyError, we are making sure that the UID will *not* be skipped.
getpwuid_mock.side_effect = KeyError()
getgrgid_mock.side_effect = KeyError()
yield
2021-08-04 01:54:21 -04:00
@pytest.fixture
def simple_user():
return User(
uid='test_jdoe',
cn='John Doe',
program='Math',
terms=['s2021'],
)
@pytest.fixture
def simple_club():
return User(
uid='test_club1',
cn='Club One',
is_club=True,
)
@pytest.fixture
2021-08-19 12:14:41 -04:00
def ldap_user(simple_user, g_admin_ctx):
with g_admin_ctx():
simple_user.add_to_ldap()
2021-08-04 01:54:21 -04:00
yield simple_user
2021-08-19 12:14:41 -04:00
with g_admin_ctx():
simple_user.remove_from_ldap()
2021-08-04 01:54:21 -04:00
@pytest.fixture
def krb_user(simple_user):
# We don't want to use add_to_kerberos() here because that expires the
# user's password, which we don't want for testing
subprocess.run(
['kadmin', '-k', '-p', 'ceod/admin', 'addprinc', '-pw', 'krb5',
simple_user.uid], stdout=DEVNULL, check=True)
2021-08-04 01:54:21 -04:00
yield simple_user
simple_user.remove_from_kerberos()
2021-08-04 02:33:50 -04:00
@pytest.fixture
def simple_group():
return Group(
cn='group1',
gid_number=21000,
2021-08-18 20:05:44 -04:00
user_cn='Group One',
2021-08-04 02:33:50 -04:00
)
@pytest.fixture
2021-08-19 12:14:41 -04:00
def ldap_group(simple_group, g_admin_ctx):
with g_admin_ctx():
simple_group.add_to_ldap()
2021-08-04 02:33:50 -04:00
yield simple_group
2021-08-19 12:14:41 -04:00
with g_admin_ctx():
simple_group.remove_from_ldap()
2021-08-04 16:59:36 -04:00
@pytest.fixture
2021-08-17 21:59:24 -04:00
def uwldap_user(cfg, uwldap_srv, ldap_conn):
conn = ldap_conn
2021-08-04 16:59:36 -04:00
base_dn = cfg.get('uwldap_base')
user = UWLDAPRecord(
uid='test_jdoe',
mail_local_addresses=['test_jdoe@uwaterloo.internal'],
program='Math',
cn='John Doe',
sn='Doe',
given_name='John',
)
dn = f'uid={user.uid},{base_dn}'
2021-08-15 01:04:49 -04:00
conn.add(
dn,
[
2021-08-04 16:59:36 -04:00
'inetLocalMailRecipient',
'inetOrgPerson',
'organizationalPerson',
'person',
],
2021-08-15 01:04:49 -04:00
{
'mailLocalAddress': user.mail_local_addresses,
'ou': user.program,
'cn': user.cn,
'sn': user.sn,
'givenName': user.given_name,
},
)
2021-08-04 16:59:36 -04:00
yield user
2021-08-15 01:04:49 -04:00
conn.delete(dn)
2021-08-23 19:01:24 -04:00
@pytest.fixture(scope='module')
def app_process(cfg, app, http_client):
port = cfg.get('ceod_port')
hostname = socket.gethostname()
def server_start():
sys.stdout = open('/dev/null', 'w')
sys.stderr = sys.stdout
app.run(debug=False, host='0.0.0.0', port=port)
proc = Process(target=server_start)
proc.start()
try:
2021-08-28 23:09:02 -04:00
for i in range(5):
try:
http_client.get(hostname, '/ping')
except requests.exceptions.ConnectionError:
time.sleep(1)
continue
break
2021-08-23 19:01:24 -04:00
assert i != 5, 'Timed out'
yield
finally:
proc.terminate()
proc.join()