forked from public/pyceo
81 lines
2.2 KiB
Python
81 lines
2.2 KiB
Python
import os
|
|
import importlib.resources
|
|
|
|
import ldap
|
|
import pytest
|
|
import socket
|
|
from zope import component
|
|
|
|
from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \
|
|
IFileService
|
|
from ceo_common.model import Config
|
|
from ceod.model import KerberosService, LDAPService, FileService
|
|
|
|
|
|
@pytest.fixture
|
|
def cfg():
|
|
with importlib.resources.path('ceo_common.test', 'ceod_test_local.ini') as p:
|
|
config_file = p.__fspath__()
|
|
_cfg = Config(config_file)
|
|
component.provideUtility(_cfg, IConfig)
|
|
return _cfg
|
|
|
|
|
|
@pytest.fixture
|
|
def krb_srv(cfg):
|
|
# we need to be root to read the keytab
|
|
assert os.geteuid() == 0
|
|
# this dance again... ugh
|
|
if socket.gethostname() == cfg.get('ceod_admin_host'):
|
|
principal = 'ceod/admin'
|
|
else:
|
|
principal = 'ceod/' + socket.getfqdn()
|
|
cache_file = '/tmp/ceod_test/krb5_cache'
|
|
if os.path.isfile(cache_file):
|
|
os.unlink(cache_file)
|
|
krb = KerberosService(principal, cache_file)
|
|
component.provideUtility(krb, IKerberosService)
|
|
yield krb
|
|
os.unlink(cache_file)
|
|
|
|
|
|
def recursively_delete_subtree(conn: ldap.ldapobject.LDAPObject, base_dn: str):
|
|
try:
|
|
records = conn.search_s(base_dn, ldap.SCOPE_ONELEVEL, attrlist=[''])
|
|
for dn, _ in records:
|
|
conn.delete_s(dn)
|
|
conn.delete_s(base_dn)
|
|
except ldap.NO_SUCH_OBJECT:
|
|
pass
|
|
|
|
|
|
@pytest.fixture
|
|
def ldap_srv(cfg, krb_srv):
|
|
conn = ldap.initialize(cfg.get('ldap_server_url'))
|
|
conn.sasl_gssapi_bind_s()
|
|
users_base = cfg.get('ldap_users_base')
|
|
groups_base = cfg.get('ldap_groups_base')
|
|
|
|
recursively_delete_subtree(conn, users_base)
|
|
recursively_delete_subtree(conn, groups_base)
|
|
|
|
for base_dn in [users_base, groups_base]:
|
|
ou = base_dn.split(',', 1)[0].split('=')[1]
|
|
conn.add_s(base_dn, ldap.modlist.addModlist({
|
|
'objectClass': [b'organizationalUnit'],
|
|
'ou': [ou.encode()]
|
|
}))
|
|
_ldap_srv = LDAPService()
|
|
component.provideUtility(_ldap_srv, ILDAPService)
|
|
yield _ldap_srv
|
|
|
|
recursively_delete_subtree(conn, users_base)
|
|
recursively_delete_subtree(conn, groups_base)
|
|
|
|
|
|
@pytest.fixture
|
|
def file_srv(cfg):
|
|
_file_srv = FileService()
|
|
component.provideUtility(_file_srv, IFileService)
|
|
return _file_srv
|