add tests for User class

This commit is contained in:
Max Erenberg 2021-08-04 05:54:21 +00:00
parent 87298e18b3
commit e7bfe36c0b
6 changed files with 304 additions and 91 deletions

View File

@ -1,6 +1,8 @@
from zope import component
from zope.interface import implementer
from ..errors import UserAlreadySubscribedError, NoSuchListError, \
UserNotSubscribedError
from ..interfaces import IMailmanService, IConfig, IHTTPClient
@ -13,8 +15,16 @@ class RemoteMailmanService:
def subscribe(self, address: str, mailing_list: str):
resp = self.http_client.post(self.mailman_host, f'/api/mailman/{mailing_list}/{address}')
resp.raise_for_status()
if not resp.ok:
if resp.status_code == 409:
raise UserAlreadySubscribedError()
elif resp.status_code == 404:
raise NoSuchListError()
raise Exception(resp.json())
def unsubscribe(self, address: str, mailing_list: str):
resp = self.http_client.delete(self.mailman_host, f'/api/mailman/{mailing_list}/{address}')
resp.raise_for_status()
if not resp.ok:
if resp.status_code == 404:
raise UserNotSubscribedError()
raise Exception(resp.json())

View File

@ -160,7 +160,7 @@ class User:
cn=data['cn'][0],
program=data.get('program', [None])[0],
terms=data.get('term'),
non_member_terms=data.get('nonUserTerm'),
non_member_terms=data.get('nonMemberTerm'),
login_shell=data['loginShell'][0],
uid_number=int(data['uidNumber'][0]),
gid_number=int(data['gidNumber'][0]),
@ -176,6 +176,7 @@ class User:
raise Exception('%s is not a valid shell' % login_shell)
new_user.login_shell = login_shell
self.ldap_srv.modify_user(self, new_user)
self.login_shell = login_shell
def add_terms(self, terms: List[str]):
for term in terms:

View File

@ -1,80 +1 @@
import os
import importlib.resources
import ldap
import pytest
import socket
from zope import component
from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \
IFileService
from ceo_common.model import Config
from ceod.model import KerberosService, LDAPService, FileService
@pytest.fixture
def cfg():
with importlib.resources.path('ceo_common.test', 'ceod_test_local.ini') as p:
config_file = p.__fspath__()
_cfg = Config(config_file)
component.provideUtility(_cfg, IConfig)
return _cfg
@pytest.fixture
def krb_srv(cfg):
# we need to be root to read the keytab
assert os.geteuid() == 0
# this dance again... ugh
if socket.gethostname() == cfg.get('ceod_admin_host'):
principal = 'ceod/admin'
else:
principal = 'ceod/' + socket.getfqdn()
cache_file = '/tmp/ceod_test/krb5_cache'
if os.path.isfile(cache_file):
os.unlink(cache_file)
krb = KerberosService(principal, cache_file)
component.provideUtility(krb, IKerberosService)
yield krb
os.unlink(cache_file)
def recursively_delete_subtree(conn: ldap.ldapobject.LDAPObject, base_dn: str):
try:
records = conn.search_s(base_dn, ldap.SCOPE_ONELEVEL, attrlist=[''])
for dn, _ in records:
conn.delete_s(dn)
conn.delete_s(base_dn)
except ldap.NO_SUCH_OBJECT:
pass
@pytest.fixture
def ldap_srv(cfg, krb_srv):
conn = ldap.initialize(cfg.get('ldap_server_url'))
conn.sasl_gssapi_bind_s()
users_base = cfg.get('ldap_users_base')
groups_base = cfg.get('ldap_groups_base')
recursively_delete_subtree(conn, users_base)
recursively_delete_subtree(conn, groups_base)
for base_dn in [users_base, groups_base]:
ou = base_dn.split(',', 1)[0].split('=')[1]
conn.add_s(base_dn, ldap.modlist.addModlist({
'objectClass': [b'organizationalUnit'],
'ou': [ou.encode()]
}))
_ldap_srv = LDAPService()
component.provideUtility(_ldap_srv, ILDAPService)
yield _ldap_srv
recursively_delete_subtree(conn, users_base)
recursively_delete_subtree(conn, groups_base)
@pytest.fixture
def file_srv(cfg):
_file_srv = FileService()
component.provideUtility(_file_srv, IFileService)
return _file_srv
from tests_common.fixtures import *

View File

@ -1,22 +1,163 @@
import os
import subprocess
import pytest
from ceo_common.errors import UserNotFoundError
from ceo_common.errors import UserNotFoundError, UserAlreadyExistsError, \
UserAlreadySubscribedError, UserNotSubscribedError
from ceod.model import User
def test_user_add_to_ldap(cfg, ldap_srv, file_srv):
def test_user_add_to_ldap(cfg, ldap_srv, simple_user):
user = simple_user
min_id = cfg.get('members_min_id')
user = User(
uid='jdoe',
cn='John Doe',
program='Math',
terms=['s2021'],
)
user.add_to_ldap()
retrieved_user = ldap_srv.get_user(user.uid)
assert retrieved_user.uid == user.uid
assert retrieved_user.uid_number >= min_id
with pytest.raises(UserAlreadyExistsError):
user.add_to_ldap()
user.remove_from_ldap()
with pytest.raises(UserNotFoundError):
ldap_srv.get_user(user.uid)
def test_club_add_to_ldap(cfg, ldap_srv, simple_club):
club = simple_club
min_id = cfg.get('clubs_min_id')
club.add_to_ldap()
retrieved_club = ldap_srv.get_user(club.uid)
assert retrieved_club.uid_number >= min_id
club.remove_from_ldap()
def getprinc(username, admin_principal, should_exist=True):
proc = subprocess.run([
'kadmin', '-k', '-p', admin_principal,
'getprinc', username,
], capture_output=True)
if should_exist:
assert proc.returncode == 0
else:
assert proc.returncode != 0
def test_user_add_to_kerberos(cfg, simple_user):
user = simple_user
admin_principal = cfg.get('ldap_admin_principal')
user.add_to_kerberos('krb5')
getprinc(user.uid, admin_principal, True)
user.remove_from_kerberos()
getprinc(user.uid, admin_principal, False)
def test_user_forwarding_addresses(cfg, ldap_user):
user = ldap_user
user.create_home_dir()
assert os.path.isdir(user.home_directory)
assert os.path.isfile(os.path.join(user.home_directory, '.bashrc'))
assert user.get_forwarding_addresses() == []
user.set_forwarding_addresses(['jdoe@example.com'])
assert user.get_forwarding_addresses() == ['jdoe@example.com']
assert open(os.path.join(user.home_directory, '.forward')).read() \
== 'jdoe@example.com\n'
user.set_forwarding_addresses([])
assert user.get_forwarding_addresses() == []
assert open(os.path.join(user.home_directory, '.forward')).read() == ''
user.delete_home_dir()
assert not os.path.isdir(user.home_directory)
def test_user_terms(ldap_user, ldap_srv):
user = ldap_user
user.add_terms(['f2021'])
assert user.terms == ['s2021', 'f2021']
assert ldap_srv.get_user(user.uid).terms == user.terms
user.add_non_member_terms(['w2022', 's2022'])
assert user.non_member_terms == ['w2022', 's2022']
assert ldap_srv.get_user(user.uid).non_member_terms == user.non_member_terms
def test_user_positions(ldap_user, ldap_srv):
user = ldap_user
user.add_position('treasurer')
assert user.positions == ['treasurer']
assert ldap_srv.get_user(user.uid).positions == user.positions
user.add_position('cro')
assert user.positions == ['treasurer', 'cro']
assert ldap_srv.get_user(user.uid).positions == user.positions
user.remove_position('cro')
assert user.positions == ['treasurer']
assert ldap_srv.get_user(user.uid).positions == user.positions
def test_user_change_password(krb_user):
user = krb_user
# TODO: test the password with kinit or similar
user.change_password('new_password')
def test_login_shell(ldap_user, ldap_srv):
user = ldap_user
user.replace_login_shell('/bin/sh')
assert user.login_shell == '/bin/sh'
assert ldap_srv.get_user(user.uid).login_shell == user.login_shell
def test_user_mailing_lists(ldap_user):
user = ldap_user
user.subscribe_to_mailing_list('csc-general')
with pytest.raises(UserAlreadySubscribedError):
user.subscribe_to_mailing_list('csc-general')
user.unsubscribe_from_mailing_list('csc-general')
with pytest.raises(UserNotSubscribedError):
user.unsubscribe_from_mailing_list('csc-general')
def test_user_to_dict(cfg):
user = User(
uid='test_jsmith',
cn='John Smith',
program='Math',
terms=['s2021'],
uid_number=21000,
gid_number=21000,
positions=['secretary'],
)
expected = {
'uid': user.uid,
'cn': user.cn,
'program': user.program,
'terms': user.terms,
'uid_number': user.uid_number,
'gid_number': user.gid_number,
'positions': user.positions,
'login_shell': '/bin/bash',
'home_directory': user.home_directory,
'dn': f'uid={user.uid},' + cfg.get('ldap_users_base'),
'is_club': False,
}
assert user.to_dict() == expected
user.mail_local_addresses = ['john.smith@csclub.internal']
expected['mail_local_addresses'] = user.mail_local_addresses
assert user.to_dict() == expected
user.create_home_dir()
expected['forwarding_addresses'] = []
assert user.to_dict(True) == expected

0
tests_common/__init__.py Normal file
View File

140
tests_common/fixtures.py Normal file
View File

@ -0,0 +1,140 @@
import importlib.resources
import os
import shutil
import ldap
import pytest
import socket
from zope import component
from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \
IFileService, IMailmanService, IHTTPClient
from ceo_common.model import Config, RemoteMailmanService, HTTPClient
from ceod.model import KerberosService, LDAPService, FileService, User, \
MailmanService
@pytest.fixture(autouse=True, scope='session')
def cfg():
with importlib.resources.path('ceo_common.test', 'ceod_test_local.ini') as p:
config_file = p.__fspath__()
_cfg = Config(config_file)
component.provideUtility(_cfg, IConfig)
return _cfg
@pytest.fixture(autouse=True, scope='session')
def krb_srv(cfg):
# we need to be root to read the keytab
assert os.geteuid() == 0
# this dance again... ugh
if socket.gethostname() == cfg.get('ceod_admin_host'):
principal = 'ceod/admin'
else:
principal = 'ceod/' + socket.getfqdn()
cache_file = '/tmp/ceod_test/krb5_cache'
if os.path.isfile(cache_file):
os.unlink(cache_file)
krb = KerberosService(principal, cache_file)
component.provideUtility(krb, IKerberosService)
yield krb
os.unlink(cache_file)
def recursively_delete_subtree(conn: ldap.ldapobject.LDAPObject, base_dn: str):
try:
records = conn.search_s(base_dn, ldap.SCOPE_ONELEVEL, attrlist=[''])
for dn, _ in records:
conn.delete_s(dn)
conn.delete_s(base_dn)
except ldap.NO_SUCH_OBJECT:
pass
@pytest.fixture(autouse=True, scope='session')
def ldap_srv(cfg, krb_srv):
conn = ldap.initialize(cfg.get('ldap_server_url'))
conn.sasl_gssapi_bind_s()
users_base = cfg.get('ldap_users_base')
groups_base = cfg.get('ldap_groups_base')
recursively_delete_subtree(conn, users_base)
recursively_delete_subtree(conn, groups_base)
for base_dn in [users_base, groups_base]:
ou = base_dn.split(',', 1)[0].split('=')[1]
conn.add_s(base_dn, ldap.modlist.addModlist({
'objectClass': [b'organizationalUnit'],
'ou': [ou.encode()]
}))
_ldap_srv = LDAPService()
component.provideUtility(_ldap_srv, ILDAPService)
yield _ldap_srv
recursively_delete_subtree(conn, users_base)
recursively_delete_subtree(conn, groups_base)
@pytest.fixture(autouse=True, scope='session')
def file_srv(cfg):
_file_srv = FileService()
component.provideUtility(_file_srv, IFileService)
members_home = cfg.get('members_home')
clubs_home = cfg.get('clubs_home')
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
yield _file_srv
shutil.rmtree(members_home, ignore_errors=True)
shutil.rmtree(clubs_home, ignore_errors=True)
@pytest.fixture
def simple_user():
return User(
uid='test_jdoe',
cn='John Doe',
program='Math',
terms=['s2021'],
)
@pytest.fixture
def simple_club():
return User(
uid='test_club1',
cn='Club One',
is_club=True,
)
@pytest.fixture
def ldap_user(simple_user):
simple_user.add_to_ldap()
yield simple_user
simple_user.remove_from_ldap()
@pytest.fixture
def krb_user(simple_user):
simple_user.add_to_kerberos('krb5')
yield simple_user
simple_user.remove_from_kerberos()
@pytest.fixture(scope='session')
def http_client():
client = HTTPClient()
component.provideUtility(client, IHTTPClient)
return
@pytest.fixture(autouse=True, scope='session')
def mailman_srv(cfg, http_client):
if socket.gethostname() == cfg.get('ceod_mailman_host'):
# TODO: use a mock server on drone.io
mailman = MailmanService()
else:
mailman = RemoteMailmanService()
component.provideUtility(mailman, IMailmanService)
return mailman