Implement Groups API (#6)
This PR implements the /api/groups endpoints. Closes #2. Reviewed-on: #6 Co-authored-by: Max Erenberg <merenber@localhost> Co-committed-by: Max Erenberg <merenber@localhost>
This commit is contained in:
parent
cc0bc4a638
commit
ecf089c261
|
@ -26,6 +26,16 @@ class GroupAlreadyExistsError(Exception):
|
|||
super().__init__('group already exists')
|
||||
|
||||
|
||||
class UserAlreadyInGroupError(Exception):
|
||||
def __init__(self):
|
||||
super().__init__('user is already in group')
|
||||
|
||||
|
||||
class UserNotInGroupError(Exception):
|
||||
def __init__(self):
|
||||
super().__init__('user is not in group')
|
||||
|
||||
|
||||
class UserAlreadySubscribedError(Exception):
|
||||
def __init__(self):
|
||||
super().__init__('user is already subscribed')
|
||||
|
|
|
@ -6,9 +6,11 @@ class IGroup(Interface):
|
|||
|
||||
cn = Attribute('common name')
|
||||
gid_number = Attribute('gid number')
|
||||
description = Attribute('optional description')
|
||||
members = Attribute('usernames of group members')
|
||||
|
||||
ldap3_entry = Attribute('cached ldap3.Entry instance for this group')
|
||||
user_cn = Attribute('cached CN of the user associated with this group')
|
||||
|
||||
def add_to_ldap():
|
||||
"""Add a new record to LDAP for this group."""
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import List, Union
|
||||
from typing import List, Dict, Union
|
||||
|
||||
from zope.interface import Interface
|
||||
|
||||
|
@ -18,6 +18,12 @@ class ILDAPService(Interface):
|
|||
def get_user(username: str) -> IUser:
|
||||
"""Retrieve the user with the given username."""
|
||||
|
||||
def get_display_info_for_users(usernames: List[str]) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Retrieve a subset of the LDAP attributes for the given users.
|
||||
Useful for displaying a list of users in a compact way.
|
||||
"""
|
||||
|
||||
def add_user(user: IUser):
|
||||
"""
|
||||
Add the user to the database.
|
||||
|
|
|
@ -89,7 +89,8 @@ ffibuilder.set_source(
|
|||
"""
|
||||
#include <krb5/krb5.h>
|
||||
""",
|
||||
libraries=['krb5']
|
||||
libraries=['krb5'],
|
||||
extra_link_args=['-fsanitize=address', '-static-libasan'],
|
||||
)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
import socket
|
||||
|
||||
from flask import g
|
||||
import gssapi
|
||||
from gssapi.raw.exceptions import ExpiredCredentialsError
|
||||
import requests
|
||||
from requests_gssapi import HTTPSPNEGOAuth
|
||||
from zope import component
|
||||
from zope.interface import implementer
|
||||
|
||||
from ceo_common.interfaces import IConfig, IKerberosService, IHTTPClient
|
||||
from ceo_common.interfaces import IConfig, IHTTPClient
|
||||
|
||||
|
||||
@implementer(IHTTPClient)
|
||||
|
|
|
@ -34,11 +34,14 @@ def create_app(flask_config={}):
|
|||
from ceod.api import members
|
||||
app.register_blueprint(members.bp, url_prefix='/api/members')
|
||||
|
||||
# Only offer mailman API if this host is running Mailman
|
||||
if hostname == cfg.get('ceod_mailman_host'):
|
||||
# Only offer mailman API if this host is running Mailman
|
||||
from ceod.api import mailman
|
||||
app.register_blueprint(mailman.bp, url_prefix='/api/mailman')
|
||||
|
||||
from ceod.api import groups
|
||||
app.register_blueprint(groups.bp, url_prefix='/api/groups')
|
||||
|
||||
from ceod.api import uwldap
|
||||
app.register_blueprint(uwldap.bp, url_prefix='/api/uwldap')
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ import traceback
|
|||
from flask.app import Flask
|
||||
from werkzeug.exceptions import HTTPException
|
||||
|
||||
from ceo_common.errors import UserNotFoundError, GroupNotFoundError
|
||||
from ceo_common.logger_factory import logger_factory
|
||||
|
||||
__all__ = ['register_error_handlers']
|
||||
|
@ -19,5 +20,9 @@ def generic_error_handler(err: Exception):
|
|||
# pass through HTTP errors
|
||||
if isinstance(err, HTTPException):
|
||||
return err
|
||||
logger.error(traceback.format_exc())
|
||||
return {'error': type(err).__name__ + ': ' + str(err)}, 500
|
||||
if isinstance(err, UserNotFoundError) or isinstance(err, GroupNotFoundError):
|
||||
status_code = 404
|
||||
else:
|
||||
status_code = 500
|
||||
logger.error(traceback.format_exc())
|
||||
return {'error': type(err).__name__ + ': ' + str(err)}, status_code
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
from flask import Blueprint, request
|
||||
from zope import component
|
||||
|
||||
from .utils import authz_restrict_to_syscom, is_truthy, \
|
||||
create_streaming_response, development_only
|
||||
from ceo_common.interfaces import ILDAPService
|
||||
from ceod.transactions.groups import (
|
||||
AddGroupTransaction,
|
||||
AddMemberToGroupTransaction,
|
||||
RemoveMemberFromGroupTransaction,
|
||||
DeleteGroupTransaction,
|
||||
)
|
||||
|
||||
bp = Blueprint('groups', __name__)
|
||||
|
||||
|
||||
@bp.route('/', methods=['POST'], strict_slashes=False)
|
||||
@authz_restrict_to_syscom
|
||||
def create_group():
|
||||
body = request.get_json(force=True)
|
||||
txn = AddGroupTransaction(
|
||||
cn=body['cn'],
|
||||
description=body['description'],
|
||||
)
|
||||
return create_streaming_response(txn)
|
||||
|
||||
|
||||
@bp.route('/<group_name>')
|
||||
def get_group(group_name):
|
||||
ldap_srv = component.getUtility(ILDAPService)
|
||||
group = ldap_srv.get_group(group_name)
|
||||
return group.to_dict()
|
||||
|
||||
|
||||
@bp.route('/<group_name>/members/<username>', methods=['POST'])
|
||||
@authz_restrict_to_syscom
|
||||
def add_member_to_group(group_name, username):
|
||||
subscribe_to_lists = is_truthy(
|
||||
request.args.get('subscribe_to_lists', 'true')
|
||||
)
|
||||
txn = AddMemberToGroupTransaction(
|
||||
username=username,
|
||||
group_name=group_name,
|
||||
subscribe_to_lists=subscribe_to_lists,
|
||||
)
|
||||
return create_streaming_response(txn)
|
||||
|
||||
|
||||
@bp.route('/<group_name>/members/<username>', methods=['DELETE'])
|
||||
@authz_restrict_to_syscom
|
||||
def remove_member_from_group(group_name, username):
|
||||
unsubscribe_from_lists = is_truthy(
|
||||
request.args.get('unsubscribe_from_lists', 'true')
|
||||
)
|
||||
txn = RemoveMemberFromGroupTransaction(
|
||||
username=username,
|
||||
group_name=group_name,
|
||||
unsubscribe_from_lists=unsubscribe_from_lists,
|
||||
)
|
||||
return create_streaming_response(txn)
|
||||
|
||||
|
||||
@bp.route('/<group_name>', methods=['DELETE'])
|
||||
@authz_restrict_to_syscom
|
||||
@development_only
|
||||
def delete_group(group_name):
|
||||
txn = DeleteGroupTransaction(group_name)
|
||||
return create_streaming_response(txn)
|
|
@ -4,7 +4,7 @@ from zope import component
|
|||
from .utils import authz_restrict_to_staff, authz_restrict_to_syscom, \
|
||||
user_is_in_group, requires_authentication_no_realm, \
|
||||
create_streaming_response, development_only
|
||||
from ceo_common.errors import UserNotFoundError, BadRequest
|
||||
from ceo_common.errors import BadRequest
|
||||
from ceo_common.interfaces import ILDAPService
|
||||
from ceod.transactions.members import (
|
||||
AddMemberTransaction,
|
||||
|
@ -36,15 +36,13 @@ def create_user():
|
|||
def get_user(auth_user: str, username: str):
|
||||
get_forwarding_addresses = False
|
||||
if auth_user == username or user_is_in_group(auth_user, 'syscom'):
|
||||
# Only syscom members, or the user themselves, may see the user's
|
||||
# forwarding addresses, since this requires reading a file in the
|
||||
# user's home directory
|
||||
get_forwarding_addresses = True
|
||||
ldap_srv = component.getUtility(ILDAPService)
|
||||
try:
|
||||
user = ldap_srv.get_user(username)
|
||||
return user.to_dict(get_forwarding_addresses)
|
||||
except UserNotFoundError:
|
||||
return {
|
||||
'error': 'user not found'
|
||||
}, 404
|
||||
user = ldap_srv.get_user(username)
|
||||
return user.to_dict(get_forwarding_addresses)
|
||||
|
||||
|
||||
@bp.route('/<username>', methods=['PATCH'])
|
||||
|
|
|
@ -109,9 +109,14 @@ def create_streaming_response(txn: AbstractTransaction):
|
|||
def development_only(f: Callable) -> Callable:
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
if current_app.config.get('ENV') == 'development':
|
||||
if current_app.config.get('ENV') == 'development' or \
|
||||
current_app.config.get('TESTING'):
|
||||
return f(*args, **kwargs)
|
||||
return {
|
||||
'error': 'This endpoint may only be called in development'
|
||||
}, 403
|
||||
return wrapper
|
||||
|
||||
|
||||
def is_truthy(s: str) -> bool:
|
||||
return s.lower() in ['yes', 'true']
|
||||
|
|
|
@ -6,7 +6,12 @@ from zope import component
|
|||
from zope.interface import implementer
|
||||
|
||||
from .utils import dn_to_uid
|
||||
from ceo_common.errors import UserAlreadyInGroupError, UserNotInGroupError, \
|
||||
UserNotFoundError
|
||||
from ceo_common.interfaces import ILDAPService, IGroup
|
||||
from ceo_common.logger_factory import logger_factory
|
||||
|
||||
logger = logger_factory(__name__)
|
||||
|
||||
|
||||
@implementer(IGroup)
|
||||
|
@ -15,14 +20,18 @@ class Group:
|
|||
self,
|
||||
cn: str,
|
||||
gid_number: int,
|
||||
description: Union[str, None] = None,
|
||||
members: Union[List[str], None] = None,
|
||||
ldap3_entry: Union[ldap3.Entry, None] = None,
|
||||
user_cn: Union[str, None] = None,
|
||||
):
|
||||
self.cn = cn
|
||||
self.gid_number = gid_number
|
||||
self.description = description
|
||||
# this is a list of the usernames of the members in this group
|
||||
self.members = members or []
|
||||
self.ldap3_entry = ldap3_entry
|
||||
self.user_cn = user_cn
|
||||
|
||||
self.ldap_srv = component.getUtility(ILDAPService)
|
||||
|
||||
|
@ -30,11 +39,32 @@ class Group:
|
|||
return json.dumps(self.to_dict(), indent=2)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
data = {
|
||||
'cn': self.cn,
|
||||
'gid_number': self.gid_number,
|
||||
'members': self.members,
|
||||
}
|
||||
description = None
|
||||
if self.description:
|
||||
description = self.description
|
||||
elif self.user_cn:
|
||||
# for clubs, the human-readable description is stored in the
|
||||
# 'cn' attribute of the associated user
|
||||
description = self.user_cn
|
||||
else:
|
||||
try:
|
||||
# TODO: only fetch the CN to save bandwidth
|
||||
user = self.ldap_srv.get_user(self.cn)
|
||||
description = user.cn
|
||||
self.user_cn = user.cn
|
||||
except UserNotFoundError:
|
||||
# some groups, like syscom, don't have an associated user
|
||||
pass
|
||||
if description:
|
||||
data['description'] = description
|
||||
# to_dict() is usually called for display purposes, so get some more
|
||||
# information to display
|
||||
data['members'] = self.ldap_srv.get_display_info_for_users(self.members)
|
||||
return data
|
||||
|
||||
def add_to_ldap(self):
|
||||
self.ldap_srv.add_group(self)
|
||||
|
@ -49,6 +79,7 @@ class Group:
|
|||
return Group(
|
||||
cn=attrs['cn'][0],
|
||||
gid_number=attrs['gidNumber'][0],
|
||||
description=attrs.get('description', [None])[0],
|
||||
members=[
|
||||
dn_to_uid(dn) for dn in attrs.get('uniqueMember', [])
|
||||
],
|
||||
|
@ -57,12 +88,20 @@ class Group:
|
|||
|
||||
def add_member(self, username: str):
|
||||
dn = self.ldap_srv.uid_to_dn(username)
|
||||
with self.ldap_srv.entry_ctx_for_group(self) as entry:
|
||||
entry.uniqueMember.add(dn)
|
||||
try:
|
||||
with self.ldap_srv.entry_ctx_for_group(self) as entry:
|
||||
entry.uniqueMember.add(dn)
|
||||
except ldap3.core.exceptions.LDAPAttributeOrValueExistsResult as err:
|
||||
logger.warning(err)
|
||||
raise UserAlreadyInGroupError()
|
||||
self.members.append(username)
|
||||
|
||||
def remove_member(self, username: str):
|
||||
dn = self.ldap_srv.uid_to_dn(username)
|
||||
with self.ldap_srv.entry_ctx_for_group(self) as entry:
|
||||
entry.uniqueMember.delete(dn)
|
||||
try:
|
||||
with self.ldap_srv.entry_ctx_for_group(self) as entry:
|
||||
entry.uniqueMember.delete(dn)
|
||||
except ldap3.core.exceptions.LDAPCursorError as err:
|
||||
logger.warning(err)
|
||||
raise UserNotInGroupError()
|
||||
self.members.remove(username)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import contextlib
|
||||
import grp
|
||||
import pwd
|
||||
from typing import Union, List
|
||||
from typing import Union, Dict, List
|
||||
|
||||
from flask import g
|
||||
import ldap3
|
||||
|
@ -31,13 +31,16 @@ class LDAPService:
|
|||
self.club_max_id = cfg.get('clubs_max_id')
|
||||
|
||||
def _get_ldap_conn(self) -> ldap3.Connection:
|
||||
if 'ldap_conn' in g:
|
||||
return g.ldap_conn
|
||||
kwargs = {'auto_bind': True, 'raise_exceptions': True}
|
||||
if 'sasl_user' in g:
|
||||
kwargs['authentication'] = ldap3.SASL
|
||||
kwargs['sasl_mechanism'] = ldap3.KERBEROS
|
||||
kwargs['user'] = g.sasl_user
|
||||
# TODO: cache the connection for a single request
|
||||
conn = ldap3.Connection(self.ldap_server_url, **kwargs)
|
||||
# cache the connection for a single request
|
||||
g.ldap_conn = conn
|
||||
return conn
|
||||
|
||||
def _get_readable_entry_for_user(self, conn: ldap3.Connection, username: str) -> ldap3.Entry:
|
||||
|
@ -82,6 +85,22 @@ class LDAPService:
|
|||
entry = self._get_readable_entry_for_group(conn, cn)
|
||||
return Group.deserialize_from_ldap(entry)
|
||||
|
||||
def get_display_info_for_users(self, usernames: List[str]) -> List[Dict[str, str]]:
|
||||
if not usernames:
|
||||
return []
|
||||
conn = self._get_ldap_conn()
|
||||
filter = '(|' + ''.join([f'(uid={uid})' for uid in usernames]) + ')'
|
||||
attributes = ['uid', 'cn', 'program']
|
||||
conn.search(self.ldap_users_base, filter, attributes=attributes)
|
||||
return [
|
||||
{
|
||||
'uid': entry.uid.value,
|
||||
'cn': entry.cn.value,
|
||||
'program': entry.program.value,
|
||||
}
|
||||
for entry in conn.entries
|
||||
]
|
||||
|
||||
def uid_to_dn(self, uid: str):
|
||||
return f'uid={uid},{self.ldap_users_base}'
|
||||
|
||||
|
@ -200,6 +219,8 @@ class LDAPService:
|
|||
entry.gidNumber = group.gid_number
|
||||
if group.members:
|
||||
entry.uniqueMember = [self.uid_to_dn(uid) for uid in group.members]
|
||||
if group.description:
|
||||
entry.description = group.description
|
||||
|
||||
try:
|
||||
writer.commit()
|
||||
|
@ -223,14 +244,12 @@ class LDAPService:
|
|||
uwldap_batch_size: int = 10,
|
||||
):
|
||||
if members:
|
||||
filter_str = '(|' + ''.join([
|
||||
f'(uid={uid})' for uid in members
|
||||
]) + ')'
|
||||
filter = '(|' + ''.join([f'(uid={uid})' for uid in members]) + ')'
|
||||
else:
|
||||
filter_str = '(objectClass=*)'
|
||||
filter = '(objectClass=*)'
|
||||
conn = self._get_ldap_conn()
|
||||
conn.search(
|
||||
self.ldap_users_base, filter_str, attributes=['uid', 'program'])
|
||||
self.ldap_users_base, filter, attributes=['uid', 'program'])
|
||||
uids = [entry.uid.value for entry in conn.entries]
|
||||
csc_programs = [entry.program.value for entry in conn.entries]
|
||||
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
from zope import component
|
||||
|
||||
from ..AbstractTransaction import AbstractTransaction
|
||||
from ceo_common.interfaces import ILDAPService
|
||||
from ceod.model import Group, User
|
||||
|
||||
|
||||
class AddGroupTransaction(AbstractTransaction):
|
||||
"""Transaction to create a new group."""
|
||||
|
||||
operations = [
|
||||
'add_user_to_ldap',
|
||||
'add_group_to_ldap',
|
||||
'add_sudo_role',
|
||||
'create_home_dir',
|
||||
]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cn: str,
|
||||
description: str,
|
||||
):
|
||||
super().__init__()
|
||||
self.cn = cn
|
||||
self.description = description
|
||||
self.user = None
|
||||
self.group = None
|
||||
|
||||
def child_execute_iter(self):
|
||||
ldap_srv = component.getUtility(ILDAPService)
|
||||
|
||||
# The user's UID is the group's CN.
|
||||
# The user's CN is the group's description.
|
||||
user = User(
|
||||
uid=self.cn,
|
||||
cn=self.description,
|
||||
is_club=True,
|
||||
)
|
||||
self.user = user
|
||||
user.add_to_ldap()
|
||||
yield 'add_user_to_ldap'
|
||||
|
||||
# For now, we only store the description in the user CN, and not in
|
||||
# the group record. We can change this later if desired.
|
||||
group = Group(
|
||||
cn=self.cn,
|
||||
gid_number=user.gid_number,
|
||||
user_cn=self.description,
|
||||
)
|
||||
self.group = group
|
||||
group.add_to_ldap()
|
||||
yield 'add_group_to_ldap'
|
||||
|
||||
ldap_srv.add_sudo_role(self.cn)
|
||||
yield 'add_sudo_role'
|
||||
|
||||
user.create_home_dir()
|
||||
yield 'create_home_dir'
|
||||
|
||||
self.finish(group.to_dict())
|
||||
|
||||
def rollback(self):
|
||||
ldap_srv = component.getUtility(ILDAPService)
|
||||
|
||||
if 'create_home_dir' in self.finished_operations:
|
||||
self.user.delete_home_dir()
|
||||
if 'add_sudo_role' in self.finished_operations:
|
||||
ldap_srv.remove_sudo_role(self.cn)
|
||||
if 'add_group_to_ldap' in self.finished_operations:
|
||||
self.group.remove_from_ldap()
|
||||
if 'add_user_to_ldap' in self.finished_operations:
|
||||
self.user.remove_from_ldap()
|
|
@ -0,0 +1,89 @@
|
|||
import traceback
|
||||
|
||||
from zope import component
|
||||
|
||||
from ..AbstractTransaction import AbstractTransaction
|
||||
from ceo_common.errors import UserAlreadyInGroupError, UserAlreadySubscribedError
|
||||
from ceo_common.interfaces import ILDAPService, IConfig
|
||||
from ceo_common.logger_factory import logger_factory
|
||||
|
||||
logger = logger_factory(__name__)
|
||||
|
||||
|
||||
class AddMemberToGroupTransaction(AbstractTransaction):
|
||||
"""A transaction to add a member to a group."""
|
||||
|
||||
operations = [
|
||||
'add_user_to_group',
|
||||
'add_user_to_auxiliary_groups',
|
||||
'subscribe_user_to_auxiliary_mailing_lists',
|
||||
]
|
||||
|
||||
def __init__(self, username: str, group_name: str, subscribe_to_lists: bool):
|
||||
super().__init__()
|
||||
self.username = username
|
||||
self.group_name = group_name
|
||||
self.subscribe_to_lists = subscribe_to_lists
|
||||
# a list of auxiliary Groups to which the user will be added
|
||||
self.aux_groups = []
|
||||
# a list of mailing list names to which the user will be subscribed
|
||||
self.aux_lists = []
|
||||
self.user = None
|
||||
self.group = None
|
||||
|
||||
def child_execute_iter(self):
|
||||
cfg = component.getUtility(IConfig)
|
||||
ldap_srv = component.getUtility(ILDAPService)
|
||||
user = ldap_srv.get_user(self.username)
|
||||
self.user = user
|
||||
group = ldap_srv.get_group(self.group_name)
|
||||
self.group = group
|
||||
|
||||
group.add_member(self.username)
|
||||
yield 'add_user_to_group'
|
||||
|
||||
try:
|
||||
aux_groups = cfg.get('auxiliary groups_' + self.group_name)
|
||||
for aux_group_name in aux_groups:
|
||||
try:
|
||||
aux_group = ldap_srv.get_group(aux_group_name)
|
||||
aux_group.add_member(self.username)
|
||||
self.aux_groups.append(aux_group)
|
||||
except UserAlreadyInGroupError:
|
||||
pass
|
||||
if self.aux_groups:
|
||||
yield 'add_user_to_auxiliary_groups'
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if self.subscribe_to_lists:
|
||||
try:
|
||||
aux_lists = cfg.get('auxiliary mailing lists_' + self.group_name)
|
||||
for aux_list in aux_lists:
|
||||
try:
|
||||
user.subscribe_to_mailing_list(aux_list)
|
||||
self.aux_lists.append(aux_list)
|
||||
except UserAlreadySubscribedError:
|
||||
pass
|
||||
if self.aux_lists:
|
||||
yield 'subscribe_user_to_auxiliary_mailing_lists'
|
||||
except KeyError:
|
||||
pass
|
||||
except Exception:
|
||||
logger.error(traceback.format_exc())
|
||||
yield 'failed_to_subscribe_user_to_auxiliary_mailing_lists'
|
||||
|
||||
result = {
|
||||
'added_to_groups': [self.group_name] + [
|
||||
group.cn for group in self.aux_groups
|
||||
],
|
||||
}
|
||||
if self.aux_lists:
|
||||
result['subscribed_to_lists'] = self.aux_lists
|
||||
self.finish(result)
|
||||
|
||||
def rollback(self):
|
||||
for aux_group in self.aux_groups:
|
||||
aux_group.remove_member(self.username)
|
||||
if 'add_user_to_group' in self.finished_operations:
|
||||
self.group.remove_member(self.username)
|
|
@ -0,0 +1,44 @@
|
|||
from zope import component
|
||||
|
||||
from ..AbstractTransaction import AbstractTransaction
|
||||
from ceo_common.interfaces import ILDAPService
|
||||
from ceo_common.logger_factory import logger_factory
|
||||
|
||||
logger = logger_factory(__name__)
|
||||
|
||||
|
||||
class DeleteGroupTransaction(AbstractTransaction):
|
||||
"""
|
||||
A transaction to permanently delete a group.
|
||||
This should only be called in development mode.
|
||||
"""
|
||||
|
||||
operations = [
|
||||
'remove_sudo_role',
|
||||
'delete_home_dir',
|
||||
'remove_user_from_ldap',
|
||||
'remove_group_from_ldap',
|
||||
]
|
||||
|
||||
def __init__(self, group_name):
|
||||
super().__init__()
|
||||
self.group_name = group_name
|
||||
|
||||
def child_execute_iter(self):
|
||||
ldap_srv = component.getUtility(ILDAPService)
|
||||
user = ldap_srv.get_user(self.group_name)
|
||||
group = ldap_srv.get_group(self.group_name)
|
||||
|
||||
ldap_srv.remove_sudo_role(group.cn)
|
||||
yield 'remove_sudo_role'
|
||||
|
||||
user.delete_home_dir()
|
||||
yield 'delete_home_dir'
|
||||
|
||||
user.remove_from_ldap()
|
||||
yield 'remove_user_from_ldap'
|
||||
|
||||
group.remove_from_ldap()
|
||||
yield 'remove_group_from_ldap'
|
||||
|
||||
self.finish('OK')
|
|
@ -0,0 +1,89 @@
|
|||
import traceback
|
||||
|
||||
from zope import component
|
||||
|
||||
from ..AbstractTransaction import AbstractTransaction
|
||||
from ceo_common.errors import UserNotInGroupError, UserNotSubscribedError
|
||||
from ceo_common.interfaces import ILDAPService, IConfig
|
||||
from ceo_common.logger_factory import logger_factory
|
||||
|
||||
logger = logger_factory(__name__)
|
||||
|
||||
|
||||
class RemoveMemberFromGroupTransaction(AbstractTransaction):
|
||||
"""A transaction to remove a member from a group."""
|
||||
|
||||
operations = [
|
||||
'remove_user_from_group',
|
||||
'remove_user_from_auxiliary_groups',
|
||||
'unsubscribe_user_from_auxiliary_mailing_lists',
|
||||
]
|
||||
|
||||
def __init__(self, username: str, group_name: str, unsubscribe_from_lists: bool = True):
|
||||
super().__init__()
|
||||
self.username = username
|
||||
self.group_name = group_name
|
||||
self.unsubscribe_from_lists = unsubscribe_from_lists
|
||||
# a list of auxiliary Groups from which the user will be removed
|
||||
self.aux_groups = []
|
||||
# a list of mailing list names from which the user will be unsubscribed
|
||||
self.aux_lists = []
|
||||
self.user = None
|
||||
self.group = None
|
||||
|
||||
def child_execute_iter(self):
|
||||
cfg = component.getUtility(IConfig)
|
||||
ldap_srv = component.getUtility(ILDAPService)
|
||||
user = ldap_srv.get_user(self.username)
|
||||
self.user = user
|
||||
group = ldap_srv.get_group(self.group_name)
|
||||
self.group = group
|
||||
|
||||
group.remove_member(self.username)
|
||||
yield 'remove_user_from_group'
|
||||
|
||||
try:
|
||||
aux_groups = cfg.get('auxiliary groups_' + self.group_name)
|
||||
for aux_group_name in aux_groups:
|
||||
try:
|
||||
aux_group = ldap_srv.get_group(aux_group_name)
|
||||
aux_group.remove_member(self.username)
|
||||
self.aux_groups.append(aux_group)
|
||||
except UserNotInGroupError:
|
||||
pass
|
||||
if self.aux_groups:
|
||||
yield 'remove_user_from_auxiliary_groups'
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if self.unsubscribe_from_lists:
|
||||
try:
|
||||
aux_lists = cfg.get('auxiliary mailing lists_' + self.group_name)
|
||||
for aux_list in aux_lists:
|
||||
try:
|
||||
user.unsubscribe_from_mailing_list(aux_list)
|
||||
self.aux_lists.append(aux_list)
|
||||
except UserNotSubscribedError:
|
||||
pass
|
||||
if self.aux_lists:
|
||||
yield 'unsubscribe_user_from_auxiliary_mailing_lists'
|
||||
except KeyError:
|
||||
pass
|
||||
except Exception:
|
||||
logger.error(traceback.format_exc())
|
||||
yield 'failed_to_unsubscribe_user_from_auxiliary_mailing_lists'
|
||||
|
||||
result = {
|
||||
'removed_from_groups': [self.group_name] + [
|
||||
group.cn for group in self.aux_groups
|
||||
],
|
||||
}
|
||||
if self.aux_lists:
|
||||
result['unsubscribed_from_lists'] = self.aux_lists
|
||||
self.finish(result)
|
||||
|
||||
def rollback(self):
|
||||
for aux_group in self.aux_groups:
|
||||
aux_group.add_member(self.username)
|
||||
if 'remove_user_from_group' in self.finished_operations:
|
||||
self.group.add_member(self.username)
|
|
@ -0,0 +1,4 @@
|
|||
from .AddGroupTransaction import AddGroupTransaction
|
||||
from .AddMemberToGroupTransaction import AddMemberToGroupTransaction
|
||||
from .RemoveMemberFromGroupTransaction import RemoveMemberFromGroupTransaction
|
||||
from .DeleteGroupTransaction import DeleteGroupTransaction
|
|
@ -42,70 +42,70 @@ class AddMemberTransaction(AbstractTransaction):
|
|||
self.terms = terms
|
||||
self.non_member_terms = non_member_terms
|
||||
self.forwarding_addresses = forwarding_addresses
|
||||
self.member = None
|
||||
self.user = None
|
||||
self.group = None
|
||||
self.new_member_list = cfg.get('mailman3_new_member_list')
|
||||
self.mail_srv = component.getUtility(IMailService)
|
||||
|
||||
def child_execute_iter(self):
|
||||
member = User(
|
||||
user = User(
|
||||
uid=self.uid,
|
||||
cn=self.cn,
|
||||
program=self.program,
|
||||
terms=self.terms,
|
||||
non_member_terms=self.non_member_terms,
|
||||
)
|
||||
self.member = member
|
||||
member.add_to_ldap()
|
||||
self.user = user
|
||||
user.add_to_ldap()
|
||||
yield 'add_user_to_ldap'
|
||||
|
||||
group = Group(
|
||||
cn=member.uid,
|
||||
gid_number=member.gid_number,
|
||||
cn=user.uid,
|
||||
gid_number=user.gid_number,
|
||||
)
|
||||
self.group = group
|
||||
group.add_to_ldap()
|
||||
yield 'add_group_to_ldap'
|
||||
|
||||
password = utils.gen_password()
|
||||
member.add_to_kerberos(password)
|
||||
user.add_to_kerberos(password)
|
||||
yield 'add_user_to_kerberos'
|
||||
|
||||
member.create_home_dir()
|
||||
user.create_home_dir()
|
||||
yield 'create_home_dir'
|
||||
|
||||
if self.forwarding_addresses:
|
||||
member.set_forwarding_addresses(self.forwarding_addresses)
|
||||
user.set_forwarding_addresses(self.forwarding_addresses)
|
||||
yield 'set_forwarding_addresses'
|
||||
|
||||
# The following operations can't/shouldn't be rolled back because the
|
||||
# user has already seen the email
|
||||
|
||||
try:
|
||||
self.mail_srv.send_welcome_message_to(member)
|
||||
self.mail_srv.send_welcome_message_to(user)
|
||||
yield 'send_welcome_message'
|
||||
except Exception as err:
|
||||
logger.warning('send_welcome_message failed:\n' + traceback.format_exc())
|
||||
yield 'failed_to_send_welcome_message\n' + str(err)
|
||||
|
||||
try:
|
||||
member.subscribe_to_mailing_list(self.new_member_list)
|
||||
user.subscribe_to_mailing_list(self.new_member_list)
|
||||
yield 'subscribe_to_mailing_list'
|
||||
except Exception as err:
|
||||
logger.warning('subscribe_to_mailing_list failed:\n' + traceback.format_exc())
|
||||
yield 'failed_to_subscribe_to_mailing_list\n' + str(err)
|
||||
|
||||
user_json = member.to_dict(True)
|
||||
user_json = user.to_dict(True)
|
||||
# insert the password into the JSON so that the client can see it
|
||||
user_json['password'] = password
|
||||
self.finish(user_json)
|
||||
|
||||
def rollback(self):
|
||||
if 'create_home_dir' in self.finished_operations:
|
||||
self.member.delete_home_dir()
|
||||
self.user.delete_home_dir()
|
||||
if 'add_user_to_kerberos' in self.finished_operations:
|
||||
self.member.remove_from_kerberos()
|
||||
self.user.remove_from_kerberos()
|
||||
if 'add_group_to_ldap' in self.finished_operations:
|
||||
self.group.remove_from_ldap()
|
||||
if 'add_user_to_ldap' in self.finished_operations:
|
||||
self.member.remove_from_ldap()
|
||||
self.user.remove_from_ldap()
|
||||
|
|
|
@ -1,14 +1,19 @@
|
|||
import traceback
|
||||
|
||||
from zope import component
|
||||
|
||||
from ..AbstractTransaction import AbstractTransaction
|
||||
from ceo_common.errors import UserNotSubscribedError
|
||||
from ceo_common.interfaces import ILDAPService, IConfig
|
||||
from ceo_common.logger_factory import logger_factory
|
||||
|
||||
logger = logger_factory(__name__)
|
||||
|
||||
|
||||
class DeleteMemberTransaction(AbstractTransaction):
|
||||
"""
|
||||
Transaction to permanently delete a member and their resources.
|
||||
This should only be used during testing.
|
||||
This should only be used during development.
|
||||
"""
|
||||
|
||||
operations = [
|
||||
|
@ -43,5 +48,8 @@ class DeleteMemberTransaction(AbstractTransaction):
|
|||
yield 'unsubscribe_from_mailing_list'
|
||||
except UserNotSubscribedError:
|
||||
pass
|
||||
except Exception:
|
||||
logger.error(traceback.format_exc())
|
||||
yield 'failed_to_unsubscribe_from_mailing_list'
|
||||
|
||||
self.finish('OK')
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
import ldap3
|
||||
import pytest
|
||||
|
||||
from ceod.model import Group
|
||||
|
||||
|
||||
def test_api_group_not_found(client):
|
||||
status, data = client.get('/api/groups/no_such_group')
|
||||
assert status == 404
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def create_group_resp(client):
|
||||
status, data = client.post('/api/groups', json={
|
||||
'cn': 'test_group1',
|
||||
'description': 'Test Group One',
|
||||
})
|
||||
assert status == 200
|
||||
assert data[-1]['status'] == 'completed'
|
||||
yield status, data
|
||||
status, data = client.delete('/api/groups/test_group1')
|
||||
assert status == 200
|
||||
assert data[-1]['status'] == 'completed'
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def create_group_result(create_group_resp):
|
||||
# convenience method
|
||||
_, data = create_group_resp
|
||||
return data[-1]['result']
|
||||
|
||||
|
||||
def test_api_create_group(cfg, create_group_resp, ldap_conn):
|
||||
_, data = create_group_resp
|
||||
min_uid = cfg.get('clubs_min_id')
|
||||
users_base = cfg.get('ldap_users_base')
|
||||
sudo_base = cfg.get('ldap_sudo_base')
|
||||
|
||||
expected = [
|
||||
{"status": "in progress", "operation": "add_user_to_ldap"},
|
||||
{"status": "in progress", "operation": "add_group_to_ldap"},
|
||||
{"status": "in progress", "operation": "add_sudo_role"},
|
||||
{"status": "in progress", "operation": "create_home_dir"},
|
||||
{"status": "completed", "result": {
|
||||
"cn": "test_group1",
|
||||
"gid_number": min_uid,
|
||||
"description": "Test Group One",
|
||||
"members": [],
|
||||
}},
|
||||
]
|
||||
assert data == expected
|
||||
|
||||
# verify that a user was also created
|
||||
ldap_conn.search(
|
||||
f'uid=test_group1,{users_base}', '(objectClass=*)',
|
||||
search_scope=ldap3.BASE)
|
||||
assert len(ldap_conn.entries) == 1
|
||||
|
||||
# verify that a sudo role was also created
|
||||
ldap_conn.search(
|
||||
f'cn=%test_group1,{sudo_base}', '(objectClass=*)',
|
||||
search_scope=ldap3.BASE)
|
||||
assert len(ldap_conn.entries) == 1
|
||||
|
||||
|
||||
def test_api_get_group(cfg, client, create_group_result):
|
||||
old_data = create_group_result
|
||||
cn = old_data['cn']
|
||||
|
||||
status, data = client.get(f'/api/groups/{cn}')
|
||||
assert status == 200
|
||||
assert data == old_data
|
||||
|
||||
|
||||
def test_api_add_member_to_group(client, create_group_result, ldap_user):
|
||||
uid = ldap_user.uid
|
||||
cn = create_group_result['cn']
|
||||
|
||||
status, data = client.post(f'/api/groups/{cn}/members/{uid}')
|
||||
assert status == 200
|
||||
expected = [
|
||||
{"status": "in progress", "operation": "add_user_to_group"},
|
||||
{"status": "completed", "result": {"added_to_groups": [cn]}},
|
||||
]
|
||||
assert data == expected
|
||||
|
||||
_, data = client.get(f'/api/groups/{cn}')
|
||||
expected = {
|
||||
"cn": cn,
|
||||
"gid_number": create_group_result['gid_number'],
|
||||
"description": create_group_result['description'],
|
||||
"members": [
|
||||
{
|
||||
"cn": ldap_user.cn,
|
||||
"program": ldap_user.program,
|
||||
"uid": ldap_user.uid,
|
||||
}
|
||||
],
|
||||
}
|
||||
assert data == expected
|
||||
|
||||
status, data = client.delete(f'/api/groups/{cn}/members/{uid}')
|
||||
assert status == 200
|
||||
expected = [
|
||||
{"status": "in progress", "operation": "remove_user_from_group"},
|
||||
{"status": "completed", "result": {"removed_from_groups": [cn]}},
|
||||
]
|
||||
assert data == expected
|
||||
|
||||
_, data = client.get(f'/api/groups/{cn}')
|
||||
assert data['members'] == []
|
||||
|
||||
|
||||
def test_api_group_auxiliary(cfg, client, ldap_user, g_admin_ctx):
|
||||
# Make sure that syscom has auxiliary mailing lists and groups
|
||||
# defined in ceod_test_local.ini.
|
||||
# Also make sure that the auxiliary mailing lists are defined in
|
||||
# MockMailmanServer.py.
|
||||
aux_groups = cfg.get('auxiliary groups_syscom')
|
||||
aux_lists = cfg.get('auxiliary mailing lists_syscom')
|
||||
min_uid = cfg.get('clubs_min_id')
|
||||
# add one to account for the 'Test Group One' group, above
|
||||
min_uid += 1
|
||||
|
||||
group_names = ['syscom'] + aux_groups
|
||||
groups = []
|
||||
with g_admin_ctx():
|
||||
for group_name in group_names:
|
||||
group = Group(
|
||||
cn=group_name,
|
||||
gid_number=min_uid,
|
||||
)
|
||||
group.add_to_ldap()
|
||||
groups.append(group)
|
||||
min_uid += 1
|
||||
|
||||
uid = ldap_user.uid
|
||||
_, data = client.post(f'/api/groups/syscom/members/{uid}')
|
||||
expected = [
|
||||
{"status": "in progress", "operation": "add_user_to_group"},
|
||||
{"status": "in progress", "operation": "add_user_to_auxiliary_groups"},
|
||||
{"status": "in progress", "operation": "subscribe_user_to_auxiliary_mailing_lists"},
|
||||
{"status": "completed", "result": {
|
||||
"added_to_groups": group_names,
|
||||
"subscribed_to_lists": aux_lists,
|
||||
}},
|
||||
]
|
||||
assert data == expected
|
||||
|
||||
_, data = client.delete(f'/api/groups/syscom/members/{uid}')
|
||||
expected = [
|
||||
{"status": "in progress", "operation": "remove_user_from_group"},
|
||||
{"status": "in progress", "operation": "remove_user_from_auxiliary_groups"},
|
||||
{"status": "in progress", "operation": "unsubscribe_user_from_auxiliary_mailing_lists"},
|
||||
{"status": "completed", "result": {
|
||||
"removed_from_groups": group_names,
|
||||
"unsubscribed_from_lists": aux_lists,
|
||||
}},
|
||||
]
|
||||
assert data == expected
|
||||
|
||||
with g_admin_ctx():
|
||||
for group in groups:
|
||||
group.remove_from_ldap()
|
|
@ -1,5 +1,3 @@
|
|||
import pwd
|
||||
import grp
|
||||
from unittest.mock import patch
|
||||
|
||||
import ldap3
|
||||
|
@ -11,24 +9,9 @@ import ceod.utils as utils
|
|||
def test_api_user_not_found(client):
|
||||
status, data = client.get('/api/members/no_such_user')
|
||||
assert status == 404
|
||||
assert data['error'] == 'user not found'
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def mocks_for_create_user():
|
||||
with patch.object(utils, 'gen_password') as gen_password_mock, \
|
||||
patch.object(pwd, 'getpwuid') as getpwuid_mock, \
|
||||
patch.object(grp, 'getgrgid') as getgrgid_mock:
|
||||
gen_password_mock.return_value = 'krb5'
|
||||
# Normally, if getpwuid or getgrgid do *not* raise a KeyError,
|
||||
# then LDAPService will skip that UID. Therefore, by raising a
|
||||
# KeyError, we are making sure that the UID will *not* be skipped.
|
||||
getpwuid_mock.side_effect = KeyError()
|
||||
getgrgid_mock.side_effect = KeyError()
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
@pytest.fixture(scope='module')
|
||||
def create_user_resp(client, mocks_for_create_user):
|
||||
status, data = client.post('/api/members', json={
|
||||
'uid': 'test_1',
|
||||
|
@ -44,7 +27,7 @@ def create_user_resp(client, mocks_for_create_user):
|
|||
assert data[-1]['status'] == 'completed'
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
@pytest.fixture(scope='module')
|
||||
def create_user_result(create_user_resp):
|
||||
# convenience method
|
||||
_, data = create_user_resp
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import pytest
|
||||
|
||||
from ceo_common.errors import GroupNotFoundError
|
||||
from ceo_common.errors import GroupNotFoundError, UserAlreadyInGroupError, \
|
||||
UserNotInGroupError
|
||||
|
||||
|
||||
def test_group_add_to_ldap(simple_group, ldap_srv):
|
||||
|
@ -22,6 +23,9 @@ def test_group_members(ldap_group, ldap_srv):
|
|||
assert group.members == ['member1']
|
||||
assert ldap_srv.get_group(group.cn).members == group.members
|
||||
|
||||
with pytest.raises(UserAlreadyInGroupError):
|
||||
group.add_member('member1')
|
||||
|
||||
group.add_member('member2')
|
||||
assert group.members == ['member1', 'member2']
|
||||
assert ldap_srv.get_group(group.cn).members == group.members
|
||||
|
@ -30,13 +34,28 @@ def test_group_members(ldap_group, ldap_srv):
|
|||
assert group.members == ['member2']
|
||||
assert ldap_srv.get_group(group.cn).members == group.members
|
||||
|
||||
with pytest.raises(UserNotInGroupError):
|
||||
group.remove_member('member1')
|
||||
|
||||
def test_group_to_dict(simple_group):
|
||||
group = simple_group
|
||||
|
||||
expected = {
|
||||
'cn': group.cn,
|
||||
'gid_number': group.gid_number,
|
||||
'members': group.members,
|
||||
}
|
||||
assert group.to_dict() == expected
|
||||
def test_group_to_dict(ldap_group, ldap_user, g_admin_ctx):
|
||||
group = ldap_group
|
||||
|
||||
with g_admin_ctx():
|
||||
# we need LDAP credentials because to_dict() might make calls
|
||||
# to LDAP
|
||||
expected = {
|
||||
'cn': group.cn,
|
||||
'description': group.user_cn,
|
||||
'gid_number': group.gid_number,
|
||||
'members': [],
|
||||
}
|
||||
assert group.to_dict() == expected
|
||||
|
||||
group.add_member(ldap_user.uid)
|
||||
expected['members'].append({
|
||||
'uid': ldap_user.uid,
|
||||
'cn': ldap_user.cn,
|
||||
'program': ldap_user.program,
|
||||
})
|
||||
assert group.to_dict() == expected
|
||||
|
|
|
@ -33,6 +33,33 @@ def test_club_add_to_ldap(cfg, ldap_srv, simple_club):
|
|||
club.remove_from_ldap()
|
||||
|
||||
|
||||
def test_get_display_info_for_users(cfg, ldap_srv):
|
||||
user1 = User(
|
||||
uid='test_1',
|
||||
cn='Test One',
|
||||
program='Math',
|
||||
terms=['s2021'],
|
||||
)
|
||||
user2 = User(
|
||||
uid='test_2',
|
||||
cn='Test Two',
|
||||
program='Science',
|
||||
non_member_terms=['s2021'],
|
||||
)
|
||||
user1.add_to_ldap()
|
||||
user2.add_to_ldap()
|
||||
try:
|
||||
expected = [
|
||||
{'uid': user1.uid, 'cn': user1.cn, 'program': user1.program},
|
||||
{'uid': user2.uid, 'cn': user2.cn, 'program': user2.program},
|
||||
]
|
||||
retrieved = ldap_srv.get_display_info_for_users([user1.uid, user2.uid])
|
||||
assert expected == retrieved
|
||||
finally:
|
||||
user1.remove_from_ldap()
|
||||
user2.remove_from_ldap()
|
||||
|
||||
|
||||
def getprinc(username, admin_principal, should_exist=True):
|
||||
proc = subprocess.run([
|
||||
'kadmin', '-k', '-p', admin_principal,
|
||||
|
|
|
@ -50,5 +50,5 @@ syscom = office,staff,adm,src
|
|||
office = cdrom,audio,video,www
|
||||
|
||||
[auxiliary mailing lists]
|
||||
syscom = syscom,syscom-alerts,syscom-moderators,mirror
|
||||
exec = exec,exec-moderators
|
||||
syscom = syscom,syscom-alerts
|
||||
exec = exec
|
||||
|
|
|
@ -42,3 +42,10 @@ api_base_url = http://localhost:8002
|
|||
api_username = restadmin
|
||||
api_password = mailman3
|
||||
new_member_list = csc-general
|
||||
|
||||
[auxiliary groups]
|
||||
syscom = office,staff
|
||||
|
||||
[auxiliary mailing lists]
|
||||
syscom = syscom,syscom-alerts
|
||||
exec = exec
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
import contextlib
|
||||
import grp
|
||||
import importlib.resources
|
||||
import os
|
||||
import pwd
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from unittest.mock import patch
|
||||
|
||||
import flask
|
||||
import ldap3
|
||||
|
@ -18,9 +21,10 @@ from ceod.api import create_app
|
|||
from ceod.model import KerberosService, LDAPService, FileService, User, \
|
||||
MailmanService, Group, UWLDAPService, UWLDAPRecord, MailService
|
||||
from ceod.model.utils import strings_to_bytes
|
||||
import ceod.utils as utils
|
||||
from .MockSMTPServer import MockSMTPServer
|
||||
from .MockMailmanServer import MockMailmanServer
|
||||
from .conftest_ceod_api import *
|
||||
from .conftest_ceod_api import client
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
|
@ -111,20 +115,20 @@ def ldap_srv_session(cfg, krb_srv, ldap_conn):
|
|||
conn = ldap_conn
|
||||
users_base = cfg.get('ldap_users_base')
|
||||
groups_base = cfg.get('ldap_groups_base')
|
||||
sudo_base = cfg.get('ldap_sudo_base')
|
||||
|
||||
delete_subtree(conn, users_base)
|
||||
delete_subtree(conn, groups_base)
|
||||
|
||||
for base_dn in [users_base, groups_base]:
|
||||
for base_dn in [users_base, groups_base, sudo_base]:
|
||||
delete_subtree(conn, base_dn)
|
||||
ou = base_dn.split(',', 1)[0].split('=')[1]
|
||||
conn.add(base_dn, 'organizationalUnit')
|
||||
|
||||
_ldap_srv = LDAPService()
|
||||
component.provideUtility(_ldap_srv, ILDAPService)
|
||||
|
||||
yield _ldap_srv
|
||||
|
||||
delete_subtree(conn, users_base)
|
||||
delete_subtree(conn, groups_base)
|
||||
for base_dn in [users_base, groups_base, sudo_base]:
|
||||
delete_subtree(conn, base_dn)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -217,6 +221,20 @@ def app(
|
|||
return app
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def mocks_for_create_user():
|
||||
with patch.object(utils, 'gen_password') as gen_password_mock, \
|
||||
patch.object(pwd, 'getpwuid') as getpwuid_mock, \
|
||||
patch.object(grp, 'getgrgid') as getgrgid_mock:
|
||||
gen_password_mock.return_value = 'krb5'
|
||||
# Normally, if getpwuid or getgrgid do *not* raise a KeyError,
|
||||
# then LDAPService will skip that UID. Therefore, by raising a
|
||||
# KeyError, we are making sure that the UID will *not* be skipped.
|
||||
getpwuid_mock.side_effect = KeyError()
|
||||
getgrgid_mock.side_effect = KeyError()
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def simple_user():
|
||||
return User(
|
||||
|
@ -257,6 +275,7 @@ def simple_group():
|
|||
return Group(
|
||||
cn='group1',
|
||||
gid_number=21000,
|
||||
user_cn='Group One',
|
||||
)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue