use LDAP instead of NSS for authz #73
|
@ -33,6 +33,11 @@ class ILDAPService(Interface):
|
||||||
A new UID and GID will be generated and returned in the new user.
|
A new UID and GID will be generated and returned in the new user.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def get_groups_for_user(username: str) -> List[str]:
|
||||||
|
"""
|
||||||
|
Get a list of the groups to which the user belongs.
|
||||||
|
"""
|
||||||
|
|
||||||
def remove_user(user: IUser):
|
def remove_user(user: IUser):
|
||||||
"""Remove this user from the database."""
|
"""Remove this user from the database."""
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
from flask import Blueprint, g, request
|
from flask import Blueprint, request
|
||||||
from flask.json import jsonify
|
from flask.json import jsonify
|
||||||
from zope import component
|
from zope import component
|
||||||
|
|
||||||
from .utils import authz_restrict_to_staff, authz_restrict_to_syscom, \
|
from .utils import authz_restrict_to_staff, authz_restrict_to_syscom, \
|
||||||
user_is_in_group, requires_authentication_no_realm, \
|
user_is_in_group, requires_authentication_no_realm, requires_admin_creds, \
|
||||||
create_streaming_response, development_only, is_truthy
|
create_streaming_response, development_only, is_truthy
|
||||||
from ceo_common.errors import BadRequest, UserAlreadySubscribedError, UserNotSubscribedError
|
from ceo_common.errors import BadRequest, UserAlreadySubscribedError, UserNotSubscribedError
|
||||||
from ceo_common.interfaces import ILDAPService, IConfig, IMailService
|
from ceo_common.interfaces import ILDAPService, IConfig, IMailService
|
||||||
|
@ -20,8 +20,12 @@ logger = logger_factory(__name__)
|
||||||
|
|
||||||
|
|
||||||
@bp.route('/', methods=['POST'], strict_slashes=False)
|
@bp.route('/', methods=['POST'], strict_slashes=False)
|
||||||
|
@requires_admin_creds
|
||||||
@authz_restrict_to_staff
|
@authz_restrict_to_staff
|
||||||
def create_user():
|
def create_user():
|
||||||
|
# We need to use the admin creds here because office members may not
|
||||||
|
# directly create new LDAP records.
|
||||||
|
|
||||||
body = request.get_json(force=True)
|
body = request.get_json(force=True)
|
||||||
terms = body.get('terms')
|
terms = body.get('terms')
|
||||||
non_member_terms = body.get('non_member_terms')
|
non_member_terms = body.get('non_member_terms')
|
||||||
|
@ -31,10 +35,6 @@ def create_user():
|
||||||
if not body.get(attr):
|
if not body.get(attr):
|
||||||
raise BadRequest(f"Attribute '{attr}' is missing or empty")
|
raise BadRequest(f"Attribute '{attr}' is missing or empty")
|
||||||
|
|
||||||
# We need to use the admin creds here because office members may not
|
|
||||||
# directly create new LDAP records.
|
|
||||||
g.need_admin_creds = True
|
|
||||||
|
|
||||||
if terms:
|
if terms:
|
||||||
logger.info(f"Creating member {body['uid']} for terms {terms}")
|
logger.info(f"Creating member {body['uid']} for terms {terms}")
|
||||||
else:
|
else:
|
||||||
|
@ -86,19 +86,19 @@ def patch_user(auth_user: str, username: str):
|
||||||
|
|
||||||
|
|
||||||
@bp.route('/<username>/renew', methods=['POST'])
|
@bp.route('/<username>/renew', methods=['POST'])
|
||||||
|
@requires_admin_creds
|
||||||
@authz_restrict_to_staff
|
@authz_restrict_to_staff
|
||||||
def renew_user(username: str):
|
def renew_user(username: str):
|
||||||
|
# We need to use the admin creds here because office members should
|
||||||
|
# not be able to directly modify the shadowExpire field; this could
|
||||||
|
# prevent syscom members from logging into the machines.
|
||||||
|
|
||||||
body = request.get_json(force=True)
|
body = request.get_json(force=True)
|
||||||
terms = body.get('terms')
|
terms = body.get('terms')
|
||||||
non_member_terms = body.get('non_member_terms')
|
non_member_terms = body.get('non_member_terms')
|
||||||
if (terms and non_member_terms) or not (terms or non_member_terms):
|
if (terms and non_member_terms) or not (terms or non_member_terms):
|
||||||
raise BadRequest('Must specify either terms or non-member terms')
|
raise BadRequest('Must specify either terms or non-member terms')
|
||||||
|
|
||||||
# We need to use the admin creds here because office members should
|
|
||||||
# not be able to directly modify the shadowExpire field; this could
|
|
||||||
# prevent syscom members from logging into the machines.
|
|
||||||
g.need_admin_creds = True
|
|
||||||
|
|
||||||
ldap_srv = component.getUtility(ILDAPService)
|
ldap_srv = component.getUtility(ILDAPService)
|
||||||
cfg = component.getUtility(IConfig)
|
cfg = component.getUtility(IConfig)
|
||||||
user = ldap_srv.get_user(username)
|
user = ldap_srv.get_user(username)
|
||||||
|
|
|
@ -1,12 +1,9 @@
|
||||||
import functools
|
import functools
|
||||||
import grp
|
|
||||||
import json
|
import json
|
||||||
import os
|
|
||||||
import pwd
|
|
||||||
import traceback
|
import traceback
|
||||||
from typing import Callable, List
|
from typing import Callable, List
|
||||||
|
|
||||||
from flask import current_app, stream_with_context
|
from flask import current_app, g, stream_with_context
|
||||||
from zope import component
|
from zope import component
|
||||||
|
|
||||||
from .spnego import requires_authentication
|
from .spnego import requires_authentication
|
||||||
|
@ -40,9 +37,24 @@ def requires_authentication_no_realm(f: Callable) -> Callable:
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
def user_is_in_group(user: str, group: str) -> bool:
|
def requires_admin_creds(f: Callable) -> Callable:
|
||||||
"""Returns True if `user` is in `group`, False otherwise."""
|
"""
|
||||||
return user in grp.getgrnam(group).gr_mem
|
Forces the next LDAP connection to use the admin Kerberos credentials.
|
||||||
|
This must be used BEFORE any of the authz decorators, since those
|
||||||
|
may require an LDAP connection, which will get cached for later use.
|
||||||
|
"""
|
||||||
|
@functools.wraps(f)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
g.need_admin_creds = True
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def user_is_in_group(username: str, group_name: str) -> bool:
|
||||||
|
"""Returns True if `username` is in `group_name`, False otherwise."""
|
||||||
|
ldap_srv = component.getUtility(ILDAPService)
|
||||||
|
group = ldap_srv.get_group(group_name)
|
||||||
|
return username in group.members
|
||||||
|
|
||||||
|
|
||||||
def authz_restrict_to_groups(f: Callable, allowed_groups: List[str]) -> Callable:
|
def authz_restrict_to_groups(f: Callable, allowed_groups: List[str]) -> Callable:
|
||||||
|
@ -51,8 +63,6 @@ def authz_restrict_to_groups(f: Callable, allowed_groups: List[str]) -> Callable
|
||||||
specified groups.
|
specified groups.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
allowed_group_ids = [grp.getgrnam(g).gr_gid for g in allowed_groups]
|
|
||||||
|
|
||||||
@requires_authentication_no_realm
|
@requires_authentication_no_realm
|
||||||
@functools.wraps(f)
|
@functools.wraps(f)
|
||||||
def wrapper(_username: str, *args, **kwargs):
|
def wrapper(_username: str, *args, **kwargs):
|
||||||
|
@ -62,15 +72,14 @@ def authz_restrict_to_groups(f: Callable, allowed_groups: List[str]) -> Callable
|
||||||
if username.startswith('ceod/'):
|
if username.startswith('ceod/'):
|
||||||
# ceod services are always allowed to make internal calls
|
# ceod services are always allowed to make internal calls
|
||||||
return f(*args, **kwargs)
|
return f(*args, **kwargs)
|
||||||
for gid in os.getgrouplist(username, pwd.getpwnam(username).pw_gid):
|
ldap_srv = component.getUtility(ILDAPService)
|
||||||
if gid in allowed_group_ids:
|
for group_name in ldap_srv.get_groups_for_user(username):
|
||||||
|
if group_name in allowed_groups:
|
||||||
return f(*args, **kwargs)
|
return f(*args, **kwargs)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"User '{username}' denied since they are not in one of {allowed_groups}"
|
f"User '{username}' denied since they are not in one of {allowed_groups}"
|
||||||
)
|
)
|
||||||
return {
|
return {'error': f'You must be in one of {allowed_groups}'}, 403
|
||||||
'error': f'You must be in one of {allowed_groups}'
|
|
||||||
}, 403
|
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
|
@ -98,6 +98,13 @@ class LDAPService:
|
||||||
entry = self._get_readable_entry_for_group(conn, cn)
|
entry = self._get_readable_entry_for_group(conn, cn)
|
||||||
return Group.deserialize_from_ldap(entry)
|
return Group.deserialize_from_ldap(entry)
|
||||||
|
|
||||||
|
def get_groups_for_user(self, username: str) -> List[str]:
|
||||||
|
conn = self._get_ldap_conn()
|
||||||
|
conn.search(self.ldap_groups_base,
|
||||||
|
f'(uniqueMember={self.uid_to_dn(username)})',
|
||||||
|
attributes=['cn'])
|
||||||
|
return [entry.cn.value for entry in conn.entries]
|
||||||
|
|
||||||
def get_display_info_for_users(self, usernames: List[str]) -> List[Dict[str, str]]:
|
def get_display_info_for_users(self, usernames: List[str]) -> List[Dict[str, str]]:
|
||||||
if not usernames:
|
if not usernames:
|
||||||
return []
|
return []
|
||||||
|
|
|
@ -117,8 +117,7 @@ def test_groups_multiple_members(cli_setup, new_user_gen):
|
||||||
def test_groups_with_auxiliary_groups_and_mailing_lists(cli_setup, ldap_user):
|
def test_groups_with_auxiliary_groups_and_mailing_lists(cli_setup, ldap_user):
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
# make sure auxiliary groups + mailing lists exist in ceod_test_local.ini
|
# make sure auxiliary groups + mailing lists exist in ceod_test_local.ini
|
||||||
create_group('syscom', 'Systems Committee')
|
create_group('adm', 'Administrators')
|
||||||
create_group('office', 'Office')
|
|
||||||
create_group('staff', 'Staff')
|
create_group('staff', 'Staff')
|
||||||
|
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
|
@ -131,7 +130,7 @@ def test_groups_with_auxiliary_groups_and_mailing_lists(cli_setup, ldap_user):
|
||||||
"Add user to auxiliary groups... Done\n"
|
"Add user to auxiliary groups... Done\n"
|
||||||
"Subscribe user to auxiliary mailing lists... Done\n"
|
"Subscribe user to auxiliary mailing lists... Done\n"
|
||||||
"Transaction successfully completed.\n"
|
"Transaction successfully completed.\n"
|
||||||
f"Added {ldap_user.uid} to syscom, office, staff\n"
|
f"Added {ldap_user.uid} to syscom, adm, staff\n"
|
||||||
f"Subscribed {ldap_user.uid} to syscom@csclub.internal, syscom-alerts@csclub.internal\n"
|
f"Subscribed {ldap_user.uid} to syscom@csclub.internal, syscom-alerts@csclub.internal\n"
|
||||||
)
|
)
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
|
@ -147,7 +146,7 @@ def test_groups_with_auxiliary_groups_and_mailing_lists(cli_setup, ldap_user):
|
||||||
"Remove user from auxiliary groups... Done\n"
|
"Remove user from auxiliary groups... Done\n"
|
||||||
"Unsubscribe user from auxiliary mailing lists... Done\n"
|
"Unsubscribe user from auxiliary mailing lists... Done\n"
|
||||||
"Transaction successfully completed.\n"
|
"Transaction successfully completed.\n"
|
||||||
f"Removed {ldap_user.uid} from syscom, office, staff\n"
|
f"Removed {ldap_user.uid} from syscom, adm, staff\n"
|
||||||
f"Unsubscribed {ldap_user.uid} from syscom@csclub.internal, syscom-alerts@csclub.internal\n"
|
f"Unsubscribed {ldap_user.uid} from syscom@csclub.internal, syscom-alerts@csclub.internal\n"
|
||||||
)
|
)
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
|
@ -167,6 +166,5 @@ def test_groups_with_auxiliary_groups_and_mailing_lists(cli_setup, ldap_user):
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
assert 'Unsubscribed' not in result.output
|
assert 'Unsubscribed' not in result.output
|
||||||
|
|
||||||
delete_group('syscom')
|
delete_group('adm')
|
||||||
delete_group('office')
|
|
||||||
delete_group('staff')
|
delete_group('staff')
|
||||||
|
|
|
@ -4,6 +4,7 @@ import shutil
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from click.testing import CliRunner
|
from click.testing import CliRunner
|
||||||
|
import ldap3
|
||||||
|
|
||||||
from ceo.cli import cli
|
from ceo.cli import cli
|
||||||
from ceo_common.model import Term
|
from ceo_common.model import Term
|
||||||
|
@ -146,7 +147,7 @@ def test_members_pwreset(cli_setup, ldap_user, krb_user):
|
||||||
assert expected_pat.match(result.output) is not None
|
assert expected_pat.match(result.output) is not None
|
||||||
|
|
||||||
|
|
||||||
def test_members_expire(cli_setup, app_process, ldap_user, syscom_group):
|
def test_members_expire(cli_setup, app_process, ldap_user):
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -172,12 +173,17 @@ def test_members_expire(cli_setup, app_process, ldap_user, syscom_group):
|
||||||
restore_datetime_in_app_process(app_process)
|
restore_datetime_in_app_process(app_process)
|
||||||
|
|
||||||
|
|
||||||
def test_members_remindexpire(cli_setup, app_process, ldap_user):
|
def test_members_remindexpire(cfg, cli_setup, app_process, ldap_conn, ldap_user):
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
term = Term(ldap_user.terms[0])
|
term = Term(ldap_user.terms[0])
|
||||||
|
test_date = (term + 1).to_datetime()
|
||||||
|
# Add a term to ctdalek so that he doesn't show up in the results
|
||||||
|
base_dn = cfg.get('ldap_users_base')
|
||||||
|
ldap_conn.modify(
|
||||||
|
f'uid=ctdalek,{base_dn}',
|
||||||
|
{'term': [(ldap3.MODIFY_ADD, [str(term + 1)])]})
|
||||||
|
|
||||||
try:
|
try:
|
||||||
test_date = (term + 1).to_datetime()
|
|
||||||
set_datetime_in_app_process(app_process, test_date)
|
set_datetime_in_app_process(app_process, test_date)
|
||||||
result = runner.invoke(cli, ['members', 'remindexpire', '--dry-run'])
|
result = runner.invoke(cli, ['members', 'remindexpire', '--dry-run'])
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
|
@ -200,3 +206,7 @@ def test_members_remindexpire(cli_setup, app_process, ldap_user):
|
||||||
assert result.output == "No members are pending expiration.\n"
|
assert result.output == "No members are pending expiration.\n"
|
||||||
finally:
|
finally:
|
||||||
restore_datetime_in_app_process(app_process)
|
restore_datetime_in_app_process(app_process)
|
||||||
|
|
||||||
|
ldap_conn.modify(
|
||||||
|
f'uid=ctdalek,{base_dn}',
|
||||||
|
{'term': [(ldap3.MODIFY_DELETE, [str(term + 1)])]})
|
||||||
|
|
|
@ -125,7 +125,8 @@ def test_api_group_auxiliary(cfg, client, ldap_user, g_admin_ctx):
|
||||||
group_names = ['syscom'] + aux_groups
|
group_names = ['syscom'] + aux_groups
|
||||||
groups = []
|
groups = []
|
||||||
with g_admin_ctx():
|
with g_admin_ctx():
|
||||||
for group_name in group_names:
|
# the syscom group should already exist, since we need it for auth
|
||||||
|
for group_name in aux_groups:
|
||||||
group = Group(
|
group = Group(
|
||||||
cn=group_name,
|
cn=group_name,
|
||||||
gid_number=min_uid,
|
gid_number=min_uid,
|
||||||
|
@ -161,4 +162,5 @@ def test_api_group_auxiliary(cfg, client, ldap_user, g_admin_ctx):
|
||||||
|
|
||||||
with g_admin_ctx():
|
with g_admin_ctx():
|
||||||
for group in groups:
|
for group in groups:
|
||||||
group.remove_from_ldap()
|
if group.cn != 'syscom':
|
||||||
|
group.remove_from_ldap()
|
||||||
|
|
|
@ -241,7 +241,7 @@ def test_authz_check(client, create_user_result):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('term_attr', ['terms', 'non_member_terms'])
|
@pytest.mark.parametrize('term_attr', ['terms', 'non_member_terms'])
|
||||||
def test_expire(client, new_user, term_attr, syscom_group, ldap_conn):
|
def test_expire(client, new_user, term_attr, ldap_conn):
|
||||||
assert new_user.shadowExpire is None
|
assert new_user.shadowExpire is None
|
||||||
current_term = Term.current()
|
current_term = Term.current()
|
||||||
start_of_current_term = current_term.to_datetime()
|
start_of_current_term = current_term.to_datetime()
|
||||||
|
@ -281,6 +281,7 @@ def test_expire(client, new_user, term_attr, syscom_group, ldap_conn):
|
||||||
|
|
||||||
status, data = client.post('/api/members/expire?dry_run=yes')
|
status, data = client.post('/api/members/expire?dry_run=yes')
|
||||||
assert status == 200
|
assert status == 200
|
||||||
|
print(data)
|
||||||
assert (data == [uid]) == should_expire
|
assert (data == [uid]) == should_expire
|
||||||
|
|
||||||
_, user = client.get(f'/api/members/{uid}')
|
_, user = client.get(f'/api/members/{uid}')
|
||||||
|
@ -306,15 +307,13 @@ def test_expire(client, new_user, term_attr, syscom_group, ldap_conn):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('in_syscom', [True, False])
|
@pytest.mark.parametrize('in_syscom', [True, False])
|
||||||
def test_expire_syscom_member(client, new_user, syscom_group, g_admin_ctx, ldap_conn, in_syscom):
|
def test_expire_syscom_member(client, new_user, ldap_conn, in_syscom):
|
||||||
uid = new_user.uid
|
uid = new_user.uid
|
||||||
start_of_current_term = Term.current().to_datetime()
|
start_of_current_term = Term.current().to_datetime()
|
||||||
if in_syscom:
|
if in_syscom:
|
||||||
group_dn = new_user.ldap_srv.group_cn_to_dn('syscom')
|
group_dn = new_user.ldap_srv.group_cn_to_dn('syscom')
|
||||||
user_dn = new_user.ldap_srv.uid_to_dn(uid)
|
user_dn = new_user.ldap_srv.uid_to_dn(uid)
|
||||||
changes = {
|
changes = {'uniqueMember': [(ldap3.MODIFY_ADD, [user_dn])]}
|
||||||
'uniqueMember': [(ldap3.MODIFY_ADD, [user_dn])]
|
|
||||||
}
|
|
||||||
ldap_conn.modify(group_dn, changes)
|
ldap_conn.modify(group_dn, changes)
|
||||||
with patch.object(ceo_common.utils, 'get_current_datetime') as datetime_mock:
|
with patch.object(ceo_common.utils, 'get_current_datetime') as datetime_mock:
|
||||||
datetime_mock.return_value = start_of_current_term + datetime.timedelta(days=160)
|
datetime_mock.return_value = start_of_current_term + datetime.timedelta(days=160)
|
||||||
|
@ -324,9 +323,12 @@ def test_expire_syscom_member(client, new_user, syscom_group, g_admin_ctx, ldap_
|
||||||
assert data == []
|
assert data == []
|
||||||
else:
|
else:
|
||||||
assert data == [uid]
|
assert data == [uid]
|
||||||
|
if in_syscom:
|
||||||
|
changes = {'uniqueMember': [(ldap3.MODIFY_DELETE, [user_dn])]}
|
||||||
|
ldap_conn.modify(group_dn, changes)
|
||||||
|
|
||||||
|
|
||||||
def test_office_member(cfg, client):
|
def test_office_member(cfg, client, office_user):
|
||||||
admin_principal = cfg.get('ldap_admin_principal')
|
admin_principal = cfg.get('ldap_admin_principal')
|
||||||
ccache_file = cfg.get('ldap_admin_principal_ccache')
|
ccache_file = cfg.get('ldap_admin_principal_ccache')
|
||||||
if os.path.isfile(ccache_file):
|
if os.path.isfile(ccache_file):
|
||||||
|
@ -367,11 +369,17 @@ def test_office_member(cfg, client):
|
||||||
assert status == 200
|
assert status == 200
|
||||||
|
|
||||||
|
|
||||||
def test_membership_renewal_reminder(client, mock_mail_server):
|
def test_membership_renewal_reminder(cfg, client, mock_mail_server, ldap_conn):
|
||||||
uids = ['test3', 'test4']
|
uids = ['test3', 'test4']
|
||||||
# fast-forward by one term so that we don't clash with the other users
|
# fast-forward by one term so that we don't clash with the other users
|
||||||
# created by other tests
|
# created by other tests
|
||||||
term = Term.current() + 1
|
term = Term.current() + 1
|
||||||
|
# Add some terms to ctdalek so that he doesn't show up in the API calls below
|
||||||
|
base_dn = cfg.get('ldap_users_base')
|
||||||
|
ldap_conn.modify(
|
||||||
|
f'uid=ctdalek,{base_dn}',
|
||||||
|
{'term': [(ldap3.MODIFY_ADD, [str(term), str(term + 1), str(term + 2)])]})
|
||||||
|
|
||||||
with patch.object(ceo_common.utils, 'get_current_datetime') as datetime_mock:
|
with patch.object(ceo_common.utils, 'get_current_datetime') as datetime_mock:
|
||||||
datetime_mock.return_value = term.to_datetime()
|
datetime_mock.return_value = term.to_datetime()
|
||||||
for uid in uids:
|
for uid in uids:
|
||||||
|
@ -433,3 +441,7 @@ def test_membership_renewal_reminder(client, mock_mail_server):
|
||||||
status, _ = client.delete(f'/api/members/{uid}')
|
status, _ = client.delete(f'/api/members/{uid}')
|
||||||
assert status == 200
|
assert status == 200
|
||||||
mock_mail_server.messages.clear()
|
mock_mail_server.messages.clear()
|
||||||
|
|
||||||
|
ldap_conn.modify(
|
||||||
|
f'uid=ctdalek,{base_dn}',
|
||||||
|
{'term': [(ldap3.MODIFY_DELETE, [str(term), str(term + 1), str(term + 2)])]})
|
||||||
|
|
|
@ -27,7 +27,8 @@ server_url = ldap://auth1.csclub.internal
|
||||||
base = ou=TestUWLDAP,dc=csclub,dc=internal
|
base = ou=TestUWLDAP,dc=csclub,dc=internal
|
||||||
|
|
||||||
[members]
|
[members]
|
||||||
min_id = 20001
|
# 20000 is ctdalek, 20001 is office1
|
||||||
|
min_id = 20002
|
||||||
max_id = 29999
|
max_id = 29999
|
||||||
home = /tmp/test_users
|
home = /tmp/test_users
|
||||||
skel = /users/skel
|
skel = /users/skel
|
||||||
|
@ -49,7 +50,7 @@ api_password = mailman3
|
||||||
new_member_list = csc-general
|
new_member_list = csc-general
|
||||||
|
|
||||||
[auxiliary groups]
|
[auxiliary groups]
|
||||||
syscom = office,staff
|
syscom = adm,staff
|
||||||
|
|
||||||
[auxiliary mailing lists]
|
[auxiliary mailing lists]
|
||||||
syscom = syscom,syscom-alerts
|
syscom = syscom,syscom-alerts
|
||||||
|
|
|
@ -159,6 +159,70 @@ def ldap_conn(cfg) -> ldap3.Connection:
|
||||||
return conn
|
return conn
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def office_user(cfg, ldap_conn):
|
||||||
|
users_base = cfg.get('ldap_users_base')
|
||||||
|
home_parent = cfg.get('members_home')
|
||||||
|
ldap_conn.add(
|
||||||
|
f'uid=office1,{users_base}',
|
||||||
|
['top', 'account', 'posixAccount', 'shadowAccount', 'member'],
|
||||||
|
{
|
||||||
|
'uid': 'office1',
|
||||||
|
'uidNumber': '20001',
|
||||||
|
'gidNumber': '20001',
|
||||||
|
'homeDirectory': f'{home_parent}/office1',
|
||||||
|
'loginShell': '/bin/bash',
|
||||||
|
'cn': 'Office One',
|
||||||
|
'givenName': 'Office',
|
||||||
|
'sn': 'One',
|
||||||
|
'program': 'Math',
|
||||||
|
'term': [str(Term.current())],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
yield
|
||||||
|
ldap_conn.delete(f'uid=office1,{users_base}')
|
||||||
|
|
||||||
|
|
||||||
|
def add_stock_groups_and_users(cfg: IConfig, conn: ldap3.Connection):
|
||||||
|
users_base = cfg.get('ldap_users_base')
|
||||||
|
groups_base = cfg.get('ldap_groups_base')
|
||||||
|
home_parent = cfg.get('members_home')
|
||||||
|
conn.add(
|
||||||
|
f'cn=syscom,{groups_base}',
|
||||||
|
['top', 'group', 'posixGroup'],
|
||||||
|
{
|
||||||
|
'cn': 'syscom',
|
||||||
|
'gidNumber': '10001',
|
||||||
|
'uniqueMember': [f'uid=ctdalek,{users_base}']
|
||||||
|
}
|
||||||
|
)
|
||||||
|
conn.add(
|
||||||
|
f'uid=ctdalek,{users_base}',
|
||||||
|
['top', 'account', 'posixAccount', 'shadowAccount', 'member'],
|
||||||
|
{
|
||||||
|
'uid': 'ctdalek',
|
||||||
|
'uidNumber': '20000',
|
||||||
|
'gidNumber': '20000',
|
||||||
|
'homeDirectory': f'{home_parent}/ctdalek',
|
||||||
|
'loginShell': '/bin/bash',
|
||||||
|
'cn': 'Calum T. Dalek',
|
||||||
|
'givenName': 'Calum',
|
||||||
|
'sn': 'Dalek',
|
||||||
|
'program': 'Math',
|
||||||
|
'term': [str(Term.current())],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
conn.add(
|
||||||
|
f'cn=office,{groups_base}',
|
||||||
|
['top', 'group', 'posixGroup'],
|
||||||
|
{
|
||||||
|
'cn': 'office',
|
||||||
|
'gidNumber': '10003',
|
||||||
|
'uniqueMember': [f'uid=office1,{users_base}']
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
@pytest.fixture(scope='session')
|
||||||
def ldap_srv_session(cfg, krb_srv, ldap_conn):
|
def ldap_srv_session(cfg, krb_srv, ldap_conn):
|
||||||
conn = ldap_conn
|
conn = ldap_conn
|
||||||
|
@ -169,6 +233,8 @@ def ldap_srv_session(cfg, krb_srv, ldap_conn):
|
||||||
for base_dn in [users_base, groups_base, sudo_base]:
|
for base_dn in [users_base, groups_base, sudo_base]:
|
||||||
delete_subtree(conn, base_dn)
|
delete_subtree(conn, base_dn)
|
||||||
conn.add(base_dn, 'organizationalUnit')
|
conn.add(base_dn, 'organizationalUnit')
|
||||||
|
# Needed for API authentication
|
||||||
|
add_stock_groups_and_users(cfg, conn)
|
||||||
|
|
||||||
_ldap_srv = LDAPService()
|
_ldap_srv = LDAPService()
|
||||||
component.getGlobalSiteManager().registerUtility(_ldap_srv, ILDAPService)
|
component.getGlobalSiteManager().registerUtility(_ldap_srv, ILDAPService)
|
||||||
|
@ -496,20 +562,6 @@ def ldap_group(simple_group, g_admin_ctx):
|
||||||
simple_group.remove_from_ldap()
|
simple_group.remove_from_ldap()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def syscom_group(g_admin_ctx):
|
|
||||||
group = Group(
|
|
||||||
cn='syscom',
|
|
||||||
gid_number=10001,
|
|
||||||
user_cn='Systems Committee'
|
|
||||||
)
|
|
||||||
with g_admin_ctx():
|
|
||||||
group.add_to_ldap()
|
|
||||||
yield group
|
|
||||||
with g_admin_ctx():
|
|
||||||
group.remove_from_ldap()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def uwldap_user(cfg, uwldap_srv, ldap_conn):
|
def uwldap_user(cfg, uwldap_srv, ldap_conn):
|
||||||
conn = ldap_conn
|
conn = ldap_conn
|
||||||
|
|
Loading…
Reference in New Issue