diff --git a/ceo_common/model/RemoteMailmanService.py b/ceo_common/model/RemoteMailmanService.py index 5618a91..03b07ee 100644 --- a/ceo_common/model/RemoteMailmanService.py +++ b/ceo_common/model/RemoteMailmanService.py @@ -1,6 +1,8 @@ from zope import component from zope.interface import implementer +from ..errors import UserAlreadySubscribedError, NoSuchListError, \ + UserNotSubscribedError from ..interfaces import IMailmanService, IConfig, IHTTPClient @@ -13,8 +15,16 @@ class RemoteMailmanService: def subscribe(self, address: str, mailing_list: str): resp = self.http_client.post(self.mailman_host, f'/api/mailman/{mailing_list}/{address}') - resp.raise_for_status() + if not resp.ok: + if resp.status_code == 409: + raise UserAlreadySubscribedError() + elif resp.status_code == 404: + raise NoSuchListError() + raise Exception(resp.json()) def unsubscribe(self, address: str, mailing_list: str): resp = self.http_client.delete(self.mailman_host, f'/api/mailman/{mailing_list}/{address}') - resp.raise_for_status() + if not resp.ok: + if resp.status_code == 404: + raise UserNotSubscribedError() + raise Exception(resp.json()) diff --git a/ceod/model/User.py b/ceod/model/User.py index 05ffcc0..edb4d0d 100644 --- a/ceod/model/User.py +++ b/ceod/model/User.py @@ -160,7 +160,7 @@ class User: cn=data['cn'][0], program=data.get('program', [None])[0], terms=data.get('term'), - non_member_terms=data.get('nonUserTerm'), + non_member_terms=data.get('nonMemberTerm'), login_shell=data['loginShell'][0], uid_number=int(data['uidNumber'][0]), gid_number=int(data['gidNumber'][0]), @@ -176,6 +176,7 @@ class User: raise Exception('%s is not a valid shell' % login_shell) new_user.login_shell = login_shell self.ldap_srv.modify_user(self, new_user) + self.login_shell = login_shell def add_terms(self, terms: List[str]): for term in terms: diff --git a/ceod/model/test/conftest.py b/ceod/model/test/conftest.py index 605989c..592da9c 100644 --- a/ceod/model/test/conftest.py +++ b/ceod/model/test/conftest.py @@ -1,80 +1 @@ -import os -import importlib.resources - -import ldap -import pytest -import socket -from zope import component - -from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \ - IFileService -from ceo_common.model import Config -from ceod.model import KerberosService, LDAPService, FileService - - -@pytest.fixture -def cfg(): - with importlib.resources.path('ceo_common.test', 'ceod_test_local.ini') as p: - config_file = p.__fspath__() - _cfg = Config(config_file) - component.provideUtility(_cfg, IConfig) - return _cfg - - -@pytest.fixture -def krb_srv(cfg): - # we need to be root to read the keytab - assert os.geteuid() == 0 - # this dance again... ugh - if socket.gethostname() == cfg.get('ceod_admin_host'): - principal = 'ceod/admin' - else: - principal = 'ceod/' + socket.getfqdn() - cache_file = '/tmp/ceod_test/krb5_cache' - if os.path.isfile(cache_file): - os.unlink(cache_file) - krb = KerberosService(principal, cache_file) - component.provideUtility(krb, IKerberosService) - yield krb - os.unlink(cache_file) - - -def recursively_delete_subtree(conn: ldap.ldapobject.LDAPObject, base_dn: str): - try: - records = conn.search_s(base_dn, ldap.SCOPE_ONELEVEL, attrlist=['']) - for dn, _ in records: - conn.delete_s(dn) - conn.delete_s(base_dn) - except ldap.NO_SUCH_OBJECT: - pass - - -@pytest.fixture -def ldap_srv(cfg, krb_srv): - conn = ldap.initialize(cfg.get('ldap_server_url')) - conn.sasl_gssapi_bind_s() - users_base = cfg.get('ldap_users_base') - groups_base = cfg.get('ldap_groups_base') - - recursively_delete_subtree(conn, users_base) - recursively_delete_subtree(conn, groups_base) - - for base_dn in [users_base, groups_base]: - ou = base_dn.split(',', 1)[0].split('=')[1] - conn.add_s(base_dn, ldap.modlist.addModlist({ - 'objectClass': [b'organizationalUnit'], - 'ou': [ou.encode()] - })) - _ldap_srv = LDAPService() - component.provideUtility(_ldap_srv, ILDAPService) - yield _ldap_srv - - recursively_delete_subtree(conn, users_base) - recursively_delete_subtree(conn, groups_base) - - -@pytest.fixture -def file_srv(cfg): - _file_srv = FileService() - component.provideUtility(_file_srv, IFileService) - return _file_srv +from tests_common.fixtures import * diff --git a/ceod/model/test/test_user.py b/ceod/model/test/test_user.py index fcbf07e..3e2686d 100644 --- a/ceod/model/test/test_user.py +++ b/ceod/model/test/test_user.py @@ -1,22 +1,163 @@ +import os +import subprocess + import pytest -from ceo_common.errors import UserNotFoundError +from ceo_common.errors import UserNotFoundError, UserAlreadyExistsError, \ + UserAlreadySubscribedError, UserNotSubscribedError from ceod.model import User -def test_user_add_to_ldap(cfg, ldap_srv, file_srv): +def test_user_add_to_ldap(cfg, ldap_srv, simple_user): + user = simple_user min_id = cfg.get('members_min_id') - user = User( - uid='jdoe', - cn='John Doe', - program='Math', - terms=['s2021'], - ) user.add_to_ldap() retrieved_user = ldap_srv.get_user(user.uid) + assert retrieved_user.uid == user.uid assert retrieved_user.uid_number >= min_id + with pytest.raises(UserAlreadyExistsError): + user.add_to_ldap() user.remove_from_ldap() with pytest.raises(UserNotFoundError): ldap_srv.get_user(user.uid) + + +def test_club_add_to_ldap(cfg, ldap_srv, simple_club): + club = simple_club + min_id = cfg.get('clubs_min_id') + club.add_to_ldap() + retrieved_club = ldap_srv.get_user(club.uid) + + assert retrieved_club.uid_number >= min_id + club.remove_from_ldap() + + +def getprinc(username, admin_principal, should_exist=True): + proc = subprocess.run([ + 'kadmin', '-k', '-p', admin_principal, + 'getprinc', username, + ], capture_output=True) + if should_exist: + assert proc.returncode == 0 + else: + assert proc.returncode != 0 + + +def test_user_add_to_kerberos(cfg, simple_user): + user = simple_user + admin_principal = cfg.get('ldap_admin_principal') + user.add_to_kerberos('krb5') + getprinc(user.uid, admin_principal, True) + user.remove_from_kerberos() + getprinc(user.uid, admin_principal, False) + + +def test_user_forwarding_addresses(cfg, ldap_user): + user = ldap_user + + user.create_home_dir() + assert os.path.isdir(user.home_directory) + assert os.path.isfile(os.path.join(user.home_directory, '.bashrc')) + + assert user.get_forwarding_addresses() == [] + user.set_forwarding_addresses(['jdoe@example.com']) + assert user.get_forwarding_addresses() == ['jdoe@example.com'] + assert open(os.path.join(user.home_directory, '.forward')).read() \ + == 'jdoe@example.com\n' + + user.set_forwarding_addresses([]) + assert user.get_forwarding_addresses() == [] + assert open(os.path.join(user.home_directory, '.forward')).read() == '' + + user.delete_home_dir() + assert not os.path.isdir(user.home_directory) + + +def test_user_terms(ldap_user, ldap_srv): + user = ldap_user + + user.add_terms(['f2021']) + assert user.terms == ['s2021', 'f2021'] + assert ldap_srv.get_user(user.uid).terms == user.terms + + user.add_non_member_terms(['w2022', 's2022']) + assert user.non_member_terms == ['w2022', 's2022'] + assert ldap_srv.get_user(user.uid).non_member_terms == user.non_member_terms + + +def test_user_positions(ldap_user, ldap_srv): + user = ldap_user + + user.add_position('treasurer') + assert user.positions == ['treasurer'] + assert ldap_srv.get_user(user.uid).positions == user.positions + user.add_position('cro') + assert user.positions == ['treasurer', 'cro'] + assert ldap_srv.get_user(user.uid).positions == user.positions + + user.remove_position('cro') + assert user.positions == ['treasurer'] + assert ldap_srv.get_user(user.uid).positions == user.positions + + +def test_user_change_password(krb_user): + user = krb_user + + # TODO: test the password with kinit or similar + user.change_password('new_password') + + +def test_login_shell(ldap_user, ldap_srv): + user = ldap_user + + user.replace_login_shell('/bin/sh') + assert user.login_shell == '/bin/sh' + assert ldap_srv.get_user(user.uid).login_shell == user.login_shell + + +def test_user_mailing_lists(ldap_user): + user = ldap_user + + user.subscribe_to_mailing_list('csc-general') + with pytest.raises(UserAlreadySubscribedError): + user.subscribe_to_mailing_list('csc-general') + + user.unsubscribe_from_mailing_list('csc-general') + with pytest.raises(UserNotSubscribedError): + user.unsubscribe_from_mailing_list('csc-general') + + +def test_user_to_dict(cfg): + user = User( + uid='test_jsmith', + cn='John Smith', + program='Math', + terms=['s2021'], + uid_number=21000, + gid_number=21000, + positions=['secretary'], + ) + expected = { + 'uid': user.uid, + 'cn': user.cn, + 'program': user.program, + 'terms': user.terms, + 'uid_number': user.uid_number, + 'gid_number': user.gid_number, + 'positions': user.positions, + 'login_shell': '/bin/bash', + 'home_directory': user.home_directory, + 'dn': f'uid={user.uid},' + cfg.get('ldap_users_base'), + 'is_club': False, + } + assert user.to_dict() == expected + + user.mail_local_addresses = ['john.smith@csclub.internal'] + expected['mail_local_addresses'] = user.mail_local_addresses + assert user.to_dict() == expected + + user.create_home_dir() + expected['forwarding_addresses'] = [] + assert user.to_dict(True) == expected diff --git a/tests_common/__init__.py b/tests_common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests_common/fixtures.py b/tests_common/fixtures.py new file mode 100644 index 0000000..3bbdac6 --- /dev/null +++ b/tests_common/fixtures.py @@ -0,0 +1,140 @@ +import importlib.resources +import os +import shutil + +import ldap +import pytest +import socket +from zope import component + +from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \ + IFileService, IMailmanService, IHTTPClient +from ceo_common.model import Config, RemoteMailmanService, HTTPClient +from ceod.model import KerberosService, LDAPService, FileService, User, \ + MailmanService + + +@pytest.fixture(autouse=True, scope='session') +def cfg(): + with importlib.resources.path('ceo_common.test', 'ceod_test_local.ini') as p: + config_file = p.__fspath__() + _cfg = Config(config_file) + component.provideUtility(_cfg, IConfig) + return _cfg + + +@pytest.fixture(autouse=True, scope='session') +def krb_srv(cfg): + # we need to be root to read the keytab + assert os.geteuid() == 0 + # this dance again... ugh + if socket.gethostname() == cfg.get('ceod_admin_host'): + principal = 'ceod/admin' + else: + principal = 'ceod/' + socket.getfqdn() + cache_file = '/tmp/ceod_test/krb5_cache' + if os.path.isfile(cache_file): + os.unlink(cache_file) + krb = KerberosService(principal, cache_file) + component.provideUtility(krb, IKerberosService) + yield krb + os.unlink(cache_file) + + +def recursively_delete_subtree(conn: ldap.ldapobject.LDAPObject, base_dn: str): + try: + records = conn.search_s(base_dn, ldap.SCOPE_ONELEVEL, attrlist=['']) + for dn, _ in records: + conn.delete_s(dn) + conn.delete_s(base_dn) + except ldap.NO_SUCH_OBJECT: + pass + + +@pytest.fixture(autouse=True, scope='session') +def ldap_srv(cfg, krb_srv): + conn = ldap.initialize(cfg.get('ldap_server_url')) + conn.sasl_gssapi_bind_s() + users_base = cfg.get('ldap_users_base') + groups_base = cfg.get('ldap_groups_base') + + recursively_delete_subtree(conn, users_base) + recursively_delete_subtree(conn, groups_base) + + for base_dn in [users_base, groups_base]: + ou = base_dn.split(',', 1)[0].split('=')[1] + conn.add_s(base_dn, ldap.modlist.addModlist({ + 'objectClass': [b'organizationalUnit'], + 'ou': [ou.encode()] + })) + _ldap_srv = LDAPService() + component.provideUtility(_ldap_srv, ILDAPService) + yield _ldap_srv + + recursively_delete_subtree(conn, users_base) + recursively_delete_subtree(conn, groups_base) + + +@pytest.fixture(autouse=True, scope='session') +def file_srv(cfg): + _file_srv = FileService() + component.provideUtility(_file_srv, IFileService) + members_home = cfg.get('members_home') + clubs_home = cfg.get('clubs_home') + + shutil.rmtree(members_home, ignore_errors=True) + shutil.rmtree(clubs_home, ignore_errors=True) + yield _file_srv + shutil.rmtree(members_home, ignore_errors=True) + shutil.rmtree(clubs_home, ignore_errors=True) + + +@pytest.fixture +def simple_user(): + return User( + uid='test_jdoe', + cn='John Doe', + program='Math', + terms=['s2021'], + ) + + +@pytest.fixture +def simple_club(): + return User( + uid='test_club1', + cn='Club One', + is_club=True, + ) + + +@pytest.fixture +def ldap_user(simple_user): + simple_user.add_to_ldap() + yield simple_user + simple_user.remove_from_ldap() + + +@pytest.fixture +def krb_user(simple_user): + simple_user.add_to_kerberos('krb5') + yield simple_user + simple_user.remove_from_kerberos() + + +@pytest.fixture(scope='session') +def http_client(): + client = HTTPClient() + component.provideUtility(client, IHTTPClient) + return + + +@pytest.fixture(autouse=True, scope='session') +def mailman_srv(cfg, http_client): + if socket.gethostname() == cfg.get('ceod_mailman_host'): + # TODO: use a mock server on drone.io + mailman = MailmanService() + else: + mailman = RemoteMailmanService() + component.provideUtility(mailman, IMailmanService) + return mailman