use ldap3 instead of python-ldap

pull/5/head
Max Erenberg 1 year ago
parent 6cdb41d47b
commit d82b5a763b
  1. 18
      ceo_common/interfaces/IGroup.py
  2. 24
      ceo_common/interfaces/ILDAPService.py
  3. 16
      ceo_common/interfaces/IUser.py
  4. 65
      ceod/model/Group.py
  5. 223
      ceod/model/LDAPService.py
  6. 30
      ceod/model/UWLDAPRecord.py
  7. 34
      ceod/model/UWLDAPService.py
  8. 119
      ceod/model/User.py
  9. 2
      requirements.txt
  10. 1
      tests/ceod/model/test_group.py
  11. 1
      tests/ceod/model/test_user.py
  12. 13
      tests/ceod/model/test_uwldap.py
  13. 74
      tests/conftest.py

@ -1,5 +1,3 @@
from typing import Dict, List
from zope.interface import Interface, Attribute
@ -9,7 +7,8 @@ class IGroup(Interface):
cn = Attribute('common name')
gid_number = Attribute('gid number')
members = Attribute('usernames of group members')
dn = Attribute('distinguished name')
ldap3_entry = Attribute('cached ldap3.Entry instance for this group')
def add_to_ldap():
"""Add a new record to LDAP for this group."""
@ -20,18 +19,5 @@ class IGroup(Interface):
def remove_member(username: str):
"""Remove the member from this group in LDAP."""
def serialize_for_ldap() -> Dict[str, List[bytes]]:
"""
Serialize this group into a dict to be passed to
ldap.modlist.addModlist().
"""
# static method
def deserialize_from_ldap(data: Dict[str, List[bytes]]):
"""Deserialize this group from a dict returned by ldap.search_s().
:returns: IGroup
"""
def to_dict():
"""Serialize this group as JSON."""

@ -9,10 +9,16 @@ from .IGroup import IGroup
class ILDAPService(Interface):
"""An interface to the LDAP database."""
def uid_to_dn(self, uid: str) -> str:
"""Get the LDAP DN for the user with this UID."""
def group_cn_to_dn(self, cn: str) -> str:
"""Get the LDAP DN for the group with this CN."""
def get_user(username: str) -> IUser:
"""Retrieve the user with the given username."""
def add_user(user: IUser) -> IUser:
def add_user(user: IUser):
"""
Add the user to the database.
A new UID and GID will be generated and returned in the new user.
@ -24,7 +30,7 @@ class ILDAPService(Interface):
def get_group(cn: str, is_club: bool = False) -> IGroup:
"""Retrieve the group with the given cn (Unix group name)."""
def add_group(group: IGroup) -> IGroup:
def add_group(group: IGroup):
"""
Add the group to the database.
The GID will not be changed and must be valid.
@ -33,11 +39,17 @@ class ILDAPService(Interface):
def remove_group(group: IGroup):
"""Remove this group from the database."""
def modify_user(old_user: IUser, new_user: IUser):
"""Replace old_user with new_user."""
def entry_ctx_for_user(user: IUser):
"""
Get a context manager which yields an ldap3.WritableEntry
for this user.
"""
def modify_group(old_group: IGroup, new_group: IGroup):
"""Replace old_group with new_group."""
def entry_ctx_for_group(group: IGroup):
"""
Get a context manager which yields an ldap3.WritableEntry
for this group.
"""
def add_sudo_role(uid: str):
"""Create a sudo role for the club with this UID."""

@ -19,10 +19,9 @@ class IUser(Interface):
non_member_terms = Attribute('list of terms for which this person was '
'a club rep')
mail_local_addresses = Attribute('email aliases')
dn = Attribute('distinguished name')
# Non-LDAP attributes
forwarding_addresses = Attribute('list of email forwarding addresses')
ldap3_entry = Attribute('cached ldap3.Entry instance for this user')
def get_forwarding_addresses(self) -> List[str]:
"""Get the forwarding addresses for this user."""
@ -78,19 +77,6 @@ class IUser(Interface):
def unsubscribe_from_mailing_list(mailing_list: str):
"""Unsubscribe this user from the mailing list."""
def serialize_for_ldap() -> Dict[str, List[bytes]]:
"""
Serialize this user into a dict to be passed to
ldap.modlist.addModlist().
"""
# static method
def deserialize_from_ldap(data: Dict[str, List[bytes]]):
"""Deserialize this user from a dict returned by ldap.search_s().
:returns: IUser
"""
def to_dict(get_forwarding_addresses: bool = False) -> Dict:
"""
Serialize this user into a dict.

@ -1,28 +1,29 @@
import copy
import json
from typing import List, Dict, Union
from typing import List, Union
import ldap3
from zope import component
from zope.interface import implementer
from .utils import strings_to_bytes, bytes_to_strings, dn_to_uid
from ceo_common.interfaces import ILDAPService, IGroup, IConfig
from .utils import dn_to_uid
from ceo_common.interfaces import ILDAPService, IGroup
@implementer(IGroup)
class Group:
def __init__(
self, cn: str, gid_number: int,
self,
cn: str,
gid_number: int,
members: Union[List[str], None] = None,
ldap3_entry: Union[ldap3.Entry, None] = None,
):
self.cn = cn
self.gid_number = gid_number
# this is a list of the usernames of the members in this group
self.members = members or []
self.ldap3_entry = ldap3_entry
cfg = component.getUtility(IConfig)
self.dn = f'cn={cn},{cfg.get("ldap_groups_base")}'
self.ldap_users_base = cfg.get('ldap_users_base')
self.ldap_srv = component.getUtility(ILDAPService)
def __repr__(self) -> str:
@ -30,7 +31,6 @@ class Group:
def to_dict(self):
return {
'dn': self.dn,
'cn': self.cn,
'gid_number': self.gid_number,
'members': self.members,
@ -42,44 +42,27 @@ class Group:
def remove_from_ldap(self):
self.ldap_srv.remove_group(self)
def serialize_for_ldap(self) -> Dict[str, List[bytes]]:
data = {
'cn': [self.cn],
'gidNumber': [str(self.gid_number)],
'objectClass': [
'top',
'group',
'posixGroup',
],
}
if self.members:
data['uniqueMember'] = [
f'uid={member},{self.ldap_users_base}'
for member in self.members
]
return strings_to_bytes(data)
@staticmethod
def deserialize_from_ldap(data: Dict[str, List[bytes]]) -> IGroup:
data = bytes_to_strings(data)
def deserialize_from_ldap(entry: ldap3.Entry) -> IGroup:
"""Deserialize this group from an LDAP entry."""
attrs = entry.entry_attributes_as_dict
return Group(
cn=data['cn'][0],
gid_number=int(data['gidNumber'][0]),
cn=attrs['cn'][0],
gid_number=attrs['gidNumber'][0],
members=[
dn_to_uid(dn) for dn in data.get('uniqueMember', [])
dn_to_uid(dn) for dn in attrs.get('uniqueMember', [])
],
ldap3_entry=entry,
)
def add_member(self, username: str):
new_group = copy.copy(self)
new_group.members = self.members.copy()
new_group.members.append(username)
self.ldap_srv.modify_group(self, new_group)
self.members = new_group.members
dn = self.ldap_srv.uid_to_dn(username)
with self.ldap_srv.entry_ctx_for_group(self) as entry:
entry.uniqueMember.add(dn)
self.members.append(username)
def remove_member(self, username: str):
new_group = copy.copy(self)
new_group.members = self.members.copy()
new_group.members.remove(username)
self.ldap_srv.modify_group(self, new_group)
self.members = new_group.members
dn = self.ldap_srv.uid_to_dn(username)
with self.ldap_srv.entry_ctx_for_group(self) as entry:
entry.uniqueMember.delete(dn)
self.members.remove(username)

@ -1,73 +1,90 @@
import copy
import contextlib
import grp
import pwd
from typing import Union, List
import ldap
import ldap.modlist
import ldap3
from zope import component
from zope.interface import implementer
from ceo_common.errors import UserNotFoundError, GroupNotFoundError, \
UserAlreadyExistsError, GroupAlreadyExistsError
from ceo_common.interfaces import ILDAPService, IKerberosService, IConfig, \
from ceo_common.interfaces import ILDAPService, IConfig, \
IUser, IGroup, IUWLDAPService
from .User import User
from .Group import Group
from .SudoRole import SudoRole
from .utils import dn_to_uid, bytes_to_strings
@implementer(ILDAPService)
class LDAPService:
def __init__(self):
cfg = component.getUtility(IConfig)
self.ldap_admin_principal = cfg.get('ldap_admin_principal')
self.ldap_server_url = cfg.get('ldap_server_url')
self.ldap_sasl_realm = cfg.get('ldap_sasl_realm')
self.ldap_users_base = cfg.get('ldap_users_base')
self.ldap_groups_base = cfg.get('ldap_groups_base')
self.ldap_sudo_base = cfg.get('ldap_sudo_base')
self.member_min_id = cfg.get('members_min_id')
self.member_max_id = cfg.get('members_max_id')
self.club_min_id = cfg.get('clubs_min_id')
self.club_max_id = cfg.get('clubs_max_id')
self.sasl_user = None
def _get_ldap_conn(self, gssapi_bind: bool = True) -> ldap.ldapobject.LDAPObject:
# TODO: cache the connection
conn = ldap.initialize(self.ldap_server_url)
def set_sasl_user(self, sasl_user: Union[str, None]):
# TODO: store SASL user in flask.g instead
self.sasl_user = sasl_user
def _get_ldap_conn(self, gssapi_bind: bool = True) -> ldap3.Connection:
kwargs = {'auto_bind': True, 'raise_exceptions': True}
if gssapi_bind:
self._gssapi_bind(conn)
kwargs['authentication'] = ldap3.SASL
kwargs['sasl_mechanism'] = ldap3.KERBEROS
kwargs['user'] = self.sasl_user
# TODO: cache the connection for a single request
conn = ldap3.Connection(self.ldap_server_url, **kwargs)
return conn
def _gssapi_bind(self, conn: ldap.ldapobject.LDAPObject):
krb_srv = component.getUtility(IKerberosService)
for i in range(2):
try:
conn.sasl_gssapi_bind_s()
return
except ldap.LOCAL_ERROR as err:
if 'Ticket expired' in err.args[0]['info']:
krb_srv.kinit()
continue
raise err
raise Exception('could not perform GSSAPI bind')
def get_user(self, username: str) -> IUser:
conn = self._get_ldap_conn(False)
def _get_readable_entry_for_user(self, conn: ldap3.Connection, username: str) -> ldap3.Entry:
base = self.uid_to_dn(username)
try:
_, result = conn.search_s(base, ldap.SCOPE_BASE)[0]
return User.deserialize_from_ldap(result)
except ldap.NO_SUCH_OBJECT:
conn.search(
base, '(objectClass=*)', search_scope=ldap3.BASE,
attributes=ldap3.ALL_ATTRIBUTES)
except ldap3.core.exceptions.LDAPNoSuchObjectResult:
raise UserNotFoundError()
return conn.entries[0]
def get_group(self, cn: str) -> IGroup:
conn = self._get_ldap_conn(False)
def _get_readable_entry_for_group(self, conn: ldap3.Connection, cn: str) -> ldap3.Entry:
base = self.group_cn_to_dn(cn)
try:
_, result = conn.search_s(base, ldap.SCOPE_BASE)[0]
return Group.deserialize_from_ldap(result)
except ldap.NO_SUCH_OBJECT:
conn.search(
base, '(objectClass=*)', search_scope=ldap3.BASE,
attributes=ldap3.ALL_ATTRIBUTES)
except ldap3.core.exceptions.LDAPNoSuchObjectResult:
raise GroupNotFoundError()
return conn.entries[0]
def _get_writable_entry_for_user(self, user: IUser) -> ldap3.WritableEntry:
if user.ldap3_entry is None:
conn = self._get_ldap_conn()
user.ldap3_entry = self._get_readable_entry_for_user(conn, user.uid)
return user.ldap3_entry.entry_writable()
def _get_writable_entry_for_group(self, group: IGroup) -> ldap3.WritableEntry:
if group.ldap3_entry is None:
conn = self._get_ldap_conn()
group.ldap3_entry = self._get_readable_entry_for_group(conn, group.cn)
return group.ldap3_entry.entry_writable()
def get_user(self, username: str) -> IUser:
conn = self._get_ldap_conn(False)
entry = self._get_readable_entry_for_user(conn, username)
return User.deserialize_from_ldap(entry)
def get_group(self, cn: str) -> IGroup:
conn = self._get_ldap_conn(False)
entry = self._get_readable_entry_for_group(conn, cn)
return Group.deserialize_from_ldap(entry)
def uid_to_dn(self, uid: str):
return f'uid={uid},{self.ldap_users_base}'
@ -75,7 +92,7 @@ class LDAPService:
def group_cn_to_dn(self, cn: str):
return f'cn={cn},{self.ldap_groups_base}'
def _get_next_uid(self, conn: ldap.ldapobject.LDAPObject, min_id: int, max_id: int) -> int:
def _get_next_uid(self, conn: ldap3.Connection, min_id: int, max_id: int) -> int:
"""Gets the next available UID number between min_id and max_id, inclusive."""
def uid_exists(uid: int) -> bool:
try:
@ -92,10 +109,10 @@ class LDAPService:
return False
def ldap_uid_or_gid_exists(uid: int) -> bool:
results = conn.search_s(
self.ldap_users_base, ldap.SCOPE_ONELEVEL,
f'(|(uidNumber={uid})(gidNumber={uid}))')
return len(results) > 0
return conn.search(
self.ldap_users_base,
f'(|(uidNumber={uid})(gidNumber={uid}))',
size_limit=1)
# TODO: replace this with binary search
for uid in range(min_id, max_id + 1):
@ -106,95 +123,125 @@ class LDAPService:
def add_sudo_role(self, uid: str):
conn = self._get_ldap_conn()
sudo_role = SudoRole(uid)
modlist = ldap.modlist.addModlist(sudo_role.serialize_for_ldap())
conn.add_s(sudo_role.dn, modlist)
obj_def = ldap3.ObjectDef(['sudoRole'], conn)
writer = ldap3.Writer(conn, obj_def)
dn = f'cn=%{uid},{self.ldap_sudo_base}'
entry = writer.new(dn)
entry.cn = '%' + uid
entry.sudoUser = '%' + uid
entry.sudoHost = 'ALL'
entry.sudoCommand = 'ALL'
entry.sudoOption = ['!authenticate']
entry.sudoRunAsUser = uid
writer.commit()
def remove_sudo_role(self, uid: str):
conn = self._get_ldap_conn()
sudo_role = SudoRole(uid)
conn.delete_s(sudo_role.dn)
dn = f'cn=%{uid},{self.ldap_sudo_base}'
conn.delete(dn)
def add_user(self, user: IUser) -> IUser:
def add_user(self, user: IUser):
object_classes = ['account', 'posixAccount', 'shadowAccount']
if user.is_club():
min_id, max_id = self.club_min_id, self.club_max_id
object_classes.append('club')
else:
min_id, max_id = self.member_min_id, self.member_max_id
object_classes.append('member')
if user.mail_local_addresses:
object_classes.append('inetLocalMailRecipient')
conn = self._get_ldap_conn()
obj_def = ldap3.ObjectDef(object_classes, conn)
uid_number = self._get_next_uid(conn, min_id, max_id)
new_user = copy.copy(user)
new_user.uid_number = uid_number
new_user.gid_number = uid_number
user.uid_number = uid_number
user.gid_number = uid_number
writer = ldap3.Writer(conn, obj_def)
entry = writer.new(self.uid_to_dn(user.uid))
entry.cn = user.cn
entry.uidNumber = user.uid_number
entry.gidNumber = user.gid_number
entry.homeDirectory = user.home_directory
if user.login_shell:
entry.loginShell = user.login_shell
if user.program:
entry.program = user.program
if user.terms:
entry.term = user.terms
if user.non_member_terms:
entry.nonMemberTerm = user.non_member_terms
if user.positions:
entry.position = user.positions
if user.mail_local_addresses:
entry.mailLocalAddress = user.mail_local_addresses
if not user.is_club():
entry.userPassword = '{SASL}%s@%s' % (user.uid, self.ldap_sasl_realm)
modlist = ldap.modlist.addModlist(new_user.serialize_for_ldap())
try:
conn.add_s(new_user.dn, modlist)
except ldap.ALREADY_EXISTS:
writer.commit()
except ldap3.core.exceptions.LDAPEntryAlreadyExistsResult:
raise UserAlreadyExistsError()
return new_user
def modify_user(self, old_user: IUser, new_user: IUser):
conn = self._get_ldap_conn()
modlist = ldap.modlist.modifyModlist(
old_user.serialize_for_ldap(),
new_user.serialize_for_ldap(),
)
conn.modify_s(old_user.dn, modlist)
@contextlib.contextmanager
def entry_ctx_for_user(self, user: IUser):
entry = self._get_writable_entry_for_user(user)
yield entry
entry.entry_commit_changes()
def remove_user(self, user: IUser):
conn = self._get_ldap_conn()
conn.delete_s(user.dn)
conn.delete(self.uid_to_dn(user.uid))
def add_group(self, group: IGroup) -> IGroup:
conn = self._get_ldap_conn()
# make sure that the caller initialized the GID number
assert group.gid_number
modlist = ldap.modlist.addModlist(group.serialize_for_ldap())
obj_def = ldap3.ObjectDef(['group', 'posixGroup'], conn)
writer = ldap3.Writer(conn, obj_def)
entry = writer.new(self.group_cn_to_dn(group.cn))
entry.cn = group.cn
entry.gidNumber = group.gid_number
if group.members:
entry.uniqueMember = [self.uid_to_dn(uid) for uid in group.members]
try:
conn.add_s(group.dn, modlist)
except ldap.ALREADY_EXISTS:
writer.commit()
except ldap3.core.exceptions.LDAPEntryAlreadyExistsResult:
raise GroupAlreadyExistsError()
return group
def modify_group(self, old_group: IGroup, new_group: IGroup):
conn = self._get_ldap_conn()
modlist = ldap.modlist.modifyModlist(
old_group.serialize_for_ldap(),
new_group.serialize_for_ldap(),
)
conn.modify_s(old_group.dn, modlist)
@contextlib.contextmanager
def entry_ctx_for_group(self, group: IGroup):
entry = self._get_writable_entry_for_group(group)
yield entry
entry.entry_commit_changes()
def remove_group(self, group: IGroup):
conn = self._get_ldap_conn()
conn.delete_s(group.dn)
conn.delete(self.group_cn_to_dn(group.cn))
def update_programs(
self,
dry_run: bool = False,
members: Union[List[str], None] = None,
uwldap_batch_size: int = 100,
uwldap_batch_size: int = 10,
):
if members:
filter_str = '(|' + ''.join([
f'(uid={uid})' for uid in members
]) + ')'
else:
filter_str = None
filter_str = '(objectClass=*)'
conn = self._get_ldap_conn()
raw_csc_records = conn.search_s(
self.ldap_users_base, ldap.SCOPE_SUBTREE, filter_str,
attrlist=['program'])
uids = [
dn_to_uid(dn) for dn, _ in raw_csc_records
]
csc_programs = [
bytes_to_strings(data).get('program', [None])[0]
for _, data in raw_csc_records
]
conn.search(
self.ldap_users_base, filter_str, attributes=['uid', 'program'])
uids = [entry.uid.value for entry in conn.entries]
csc_programs = [entry.program.value for entry in conn.entries]
uwldap_srv = component.getUtility(IUWLDAPService)
uw_programs = []
# send queries in small batches
# send queries in small batches so that we don't have an
# enormous filter in our query to UWLDAP
for i in range(0, len(csc_programs), uwldap_batch_size):
batch_uids = uids[i:i + uwldap_batch_size]
batch_uw_programs = uwldap_srv.get_programs_for_users(batch_uids)
@ -210,9 +257,7 @@ class LDAPService:
return users_to_change
for uid, old_program, new_program in users_to_change:
old_entry = {'program': [old_program.encode()]}
new_entry = {'program': [new_program.encode()]}
modlist = ldap.modlist.modifyModlist(old_entry, new_entry)
conn.modify_s(self.uid_to_dn(uid), modlist)
changes = {'program': [(ldap3.MODIFY_REPLACE, [new_program])]}
conn.modify(self.uid_to_dn(uid), changes)
return users_to_change

@ -1,11 +1,20 @@
from typing import List, Dict, Union
from typing import List, Union
from .utils import bytes_to_strings
import ldap3
class UWLDAPRecord:
"""Represents a record from the UW LDAP."""
ldap_attributes = [
'uid',
'mailLocalAddress',
'ou', # program
'cn',
'sn',
'givenName',
]
def __init__(
self,
uid: str,
@ -23,19 +32,18 @@ class UWLDAPRecord:
self.given_name = given_name
@staticmethod
def deserialize_from_ldap(data: Dict[str, List[bytes]]):
def deserialize_from_ldap(entry: ldap3.Entry):
"""
Deserializes a dict returned from ldap.search_s() into a
Deserializes a dict returned from LDAP into a
UWLDAPRecord.
"""
data = bytes_to_strings(data)
return UWLDAPRecord(
uid=data['uid'][0],
mail_local_addresses=data['mailLocalAddress'],
program=data.get('ou', [None])[0],
cn=data.get('cn', [None])[0],
sn=data.get('sn', [None])[0],
given_name=data.get('givenName', [None])[0],
uid=entry.uid.value,
mail_local_addresses=entry.mailLocalAddress.values,
program=entry.ou.value,
cn=entry.cn.value,
sn=entry.sn.value,
given_name=entry.givenName.value,
)
def to_dict(self):

@ -1,11 +1,11 @@
from typing import Union, List
import ldap
import ldap3
from zope import component
from zope.interface import implementer
from .UWLDAPRecord import UWLDAPRecord
from .utils import dn_to_uid, bytes_to_strings
from .utils import dn_to_uid
from ceo_common.interfaces import IUWLDAPService, IConfig
@ -16,27 +16,33 @@ class UWLDAPService:
self.uwldap_server_url = cfg.get('uwldap_server_url')
self.uwldap_base = cfg.get('uwldap_base')
def _get_conn(self) -> ldap3.Connection:
return ldap3.Connection(
self.uwldap_server_url, auto_bind=True, read_only=True,
raise_exceptions=True)
def get_user(self, username: str) -> Union[UWLDAPRecord, None]:
conn = ldap.initialize(self.uwldap_server_url)
results = conn.search_s(self.uwldap_base, ldap.SCOPE_SUBTREE, f'uid={username}')
if not results:
conn = self._get_conn()
conn.search(
self.uwldap_base, f'(uid={username})',
attributes=UWLDAPRecord.ldap_attributes, size_limit=1)
if not conn.entries:
return None
_, data = results[0] # discard the dn
return UWLDAPRecord.deserialize_from_ldap(data)
return UWLDAPRecord.deserialize_from_ldap(conn.entries[0])
def get_programs_for_users(self, usernames: List[str]) -> List[Union[str, None]]:
filter_str = '(|' + ''.join([f'(uid={uid})' for uid in usernames]) + ')'
programs = [None] * len(usernames)
user_indices = {uid: i for i, uid in enumerate(usernames)}
conn = ldap.initialize(self.uwldap_server_url)
records = conn.search_s(
self.uwldap_base, ldap.SCOPE_SUBTREE, filter_str, attrlist=['ou'])
for dn, data in records:
uid = dn_to_uid(dn)
conn = self._get_conn()
conn.search(
self.uwldap_base, filter_str, attributes=['ou'],
size_limit=len(usernames))
for entry in conn.entries:
uid = dn_to_uid(entry.entry_dn)
idx = user_indices[uid]
data = bytes_to_strings(data)
program = data.get('ou', [None])[0]
program = entry.ou.value
if program:
programs[idx] = program
return programs

@ -1,12 +1,11 @@
import copy
import json
import os
from typing import List, Dict, Union
import ldap3
from zope import component
from zope.interface import implementer
from .utils import strings_to_bytes, bytes_to_strings
from .validators import is_valid_shell, is_valid_term
from ceo_common.interfaces import ILDAPService, IKerberosService, IFileService, \
IUser, IConfig, IMailmanService
@ -15,7 +14,9 @@ from ceo_common.interfaces import ILDAPService, IKerberosService, IFileService,
@implementer(IUser)
class User:
def __init__(
self, uid: str, cn: str,
self,
uid: str,
cn: str,
program: Union[str, None] = None,
terms: Union[List[str], None] = None,
non_member_terms: Union[List[str], None] = None,
@ -26,6 +27,7 @@ class User:
positions: Union[List[str], None] = None,
mail_local_addresses: Union[List[str], None] = None,
is_club: bool = False,
ldap3_entry: Union[ldap3.Entry, None] = None,
):
if not is_club and not terms and not non_member_terms:
raise Exception('terms and non_member_terms cannot both be empty')
@ -50,16 +52,14 @@ class User:
self.positions = positions or []
self.mail_local_addresses = mail_local_addresses or []
self._is_club = is_club
self.ldap3_entry = ldap3_entry
self.ldap_sasl_realm = cfg.get('ldap_sasl_realm')
self.dn = f'uid={uid},{cfg.get("ldap_users_base")}'
self.ldap_srv = component.getUtility(ILDAPService)
self.krb_srv = component.getUtility(IKerberosService)
self.file_srv = component.getUtility(IFileService)
def to_dict(self, get_forwarding_addresses: bool = False) -> Dict:
data = {
'dn': self.dn,
'cn': self.cn,
'uid': self.uid,
'uid_number': self.uid_number,
@ -89,14 +89,10 @@ class User:
return self._is_club
def add_to_ldap(self):
new_member = self.ldap_srv.add_user(self)
self.uid_number = new_member.uid_number
self.gid_number = new_member.gid_number
self.ldap_srv.add_user(self)
def remove_from_ldap(self):
self.ldap_srv.remove_user(self)
self.uid_number = None
self.gid_number = None
def add_to_kerberos(self, password: str):
self.krb_srv.addprinc(self.uid, password)
@ -119,102 +115,61 @@ class User:
def unsubscribe_from_mailing_list(self, mailing_list: str):
component.getUtility(IMailmanService).unsubscribe(self.uid, mailing_list)
def serialize_for_ldap(self) -> Dict:
data = {
'cn': [self.cn],
'loginShell': [self.login_shell],
'homeDirectory': [self.home_directory],
'uid': [self.uid],
'uidNumber': [str(self.uid_number)],
'gidNumber': [str(self.gid_number)],
'objectClass': [
'top',
'account',
'posixAccount',
'shadowAccount',
],
}
if self.is_club():
data['objectClass'].append('club')
else:
data['objectClass'].append('member')
data['userPassword'] = ['{SASL}%s@%s' % (self.uid, self.ldap_sasl_realm)]
if self.program:
data['program'] = [self.program]
if self.terms:
data['term'] = self.terms
if self.non_member_terms:
data['nonMemberTerm'] = self.non_member_terms
if self.positions:
data['position'] = self.positions
if self.mail_local_addresses:
data['mailLocalAddress'] = self.mail_local_addresses
data['objectClass'].append('inetLocalMailRecipient')
return strings_to_bytes(data)
@staticmethod
def deserialize_from_ldap(data: Dict[str, List[bytes]]) -> IUser:
data = bytes_to_strings(data)
def deserialize_from_ldap(entry: ldap3.Entry) -> IUser:
"""Deserialize this user from an LDAP entry."""
attrs = entry.entry_attributes_as_dict
return User(
uid=data['uid'][0],
cn=data['cn'][0],
program=data.get('program', [None])[0],
terms=data.get('term'),
non_member_terms=data.get('nonMemberTerm'),
login_shell=data['loginShell'][0],
uid_number=int(data['uidNumber'][0]),
gid_number=int(data['gidNumber'][0]),
home_directory=data['homeDirectory'][0],
positions=data.get('position'),
mail_local_addresses=data.get('mailLocalAddress', []),
is_club=('club' in data['objectClass']),
uid=attrs['uid'][0],
cn=attrs['cn'][0],
program=attrs.get('program', [None])[0],
terms=attrs.get('term'),
non_member_terms=attrs.get('nonMemberTerm'),
login_shell=attrs['loginShell'][0],
uid_number=attrs['uidNumber'][0],
gid_number=attrs['gidNumber'][0],
home_directory=attrs['homeDirectory'][0],
positions=attrs.get('position'),
mail_local_addresses=attrs.get('mailLocalAddress'),
is_club=('club' in attrs['objectClass']),
ldap3_entry=entry,
)
def replace_login_shell(self, login_shell: str):
new_user = copy.copy(self)
if not is_valid_shell(login_shell):
raise Exception('%s is not a valid shell' % login_shell)
new_user.login_shell = login_shell
self.ldap_srv.modify_user(self, new_user)
with self.ldap_srv.entry_ctx_for_user(self) as entry:
entry.loginShell = login_shell
self.login_shell = login_shell
def add_terms(self, terms: List[str]):
for term in terms:
if not is_valid_term(term):
raise Exception('%s is not a valid term' % term)
new_user = copy.copy(self)
new_user.terms = self.terms.copy()
new_user.terms.extend(terms)
self.ldap_srv.modify_user(self, new_user)
self.terms = new_user.terms
with self.ldap_srv.entry_ctx_for_user(self) as entry:
entry.term.add(terms)
self.terms.extend(terms)
def add_non_member_terms(self, terms: List[str]):
for term in terms:
if not is_valid_term(term):
raise Exception('%s is not a valid term' % term)
new_user = copy.copy(self)
new_user.non_member_terms = self.non_member_terms.copy()
new_user.non_member_terms.extend(terms)
self.ldap_srv.modify_user(self, new_user)
self.non_member_terms = new_user.non_member_terms
with self.ldap_srv.entry_ctx_for_user(self) as entry:
entry.nonMemberTerm.add(terms)
self.non_member_terms.extend(terms)
def add_position(self, position: str):
new_user = copy.copy(self)
new_user.positions = [*self.positions, position]
self.ldap_srv.modify_user(self, new_user)
self.positions = new_user.positions
with self.ldap_srv.entry_ctx_for_user(self) as entry:
entry.position.add(position)
self.positions.append(position)
def remove_position(self, position: str):
new_user = copy.copy(self)
new_user.positions = self.positions.copy()
new_user.positions.remove(position)
self.ldap_srv.modify_user(self, new_user)
self.positions = new_user.positions
with self.ldap_srv.entry_ctx_for_user(self) as entry:
entry.position.delete(position)
self.positions.remove(position)
def get_forwarding_addresses(self) -> List[str]:
return self.file_srv.get_forwarding_addresses(self)
def set_forwarding_addresses(self, addresses: List[str]):
self.file_srv.set_forwarding_addresses(self, addresses)
forwarding_addresses = property(get_forwarding_addresses, set_forwarding_addresses)

@ -2,7 +2,7 @@ Flask==2.0.1
Flask-Kerberos==1.0.4
gssapi==1.6.14
Jinja2==3.0.1
python-ldap==3.3.1
ldap3==2.9.1
requests==2.26.0
requests-gssapi==1.2.3
zope.component==5.0.1

@ -35,7 +35,6 @@ def test_group_to_dict(simple_group):
group = simple_group
expected = {
'dn': group.dn,
'cn': group.cn,
'gid_number': group.gid_number,
'members': group.members,

@ -136,7 +136,6 @@ def test_user_to_dict(cfg):
'positions': user.positions,
'login_shell': '/bin/bash',
'home_directory': user.home_directory,
'dn': f'uid={user.uid},' + cfg.get('ldap_users_base'),
'is_club': False,
}
assert user.to_dict() == expected

@ -1,4 +1,6 @@
import ldap
import ldap3
from tests.conftest import get_ldap_conn
def test_uwldap_get(uwldap_srv, uwldap_user):
@ -13,14 +15,11 @@ def test_ldap_updateprograms(cfg, ldap_srv, uwldap_srv, ldap_user, uwldap_user):
# sanity check
assert ldap_user.uid == uwldap_user.uid
# modify the user's program in UWLDAP
conn = ldap.initialize(cfg.get('uwldap_server_url'))
conn.sasl_gssapi_bind_s()
conn = get_ldap_conn(cfg.get('uwldap_server_url'))
base_dn = cfg.get('uwldap_base')
dn = f'uid={uwldap_user.uid},{base_dn}'
conn.modify_s(dn, ldap.modlist.modifyModlist(
{'ou': [uwldap_user.program.encode()]},
{'ou': [b'New Program']},
))
changes = {'ou': [(ldap3.MODIFY_REPLACE, ['New Program'])]}
conn.modify(dn, changes)
assert ldap_srv.update_programs(dry_run=True) == [
(uwldap_user.uid, uwldap_user.program, 'New Program'),

@ -2,7 +2,7 @@ import importlib.resources
import os
import shutil
import ldap
import ldap3
import pytest
import socket
from zope import component
@ -45,38 +45,40 @@ def krb_srv(cfg):
shutil.rmtree(cache_dir)
def recursively_delete_subtree(conn: ldap.ldapobject.LDAPObject, base_dn: str):
def delete_subtree(conn: ldap3.Connection, base_dn: str):
try:
records = conn.search_s(base_dn, ldap.SCOPE_ONELEVEL, attrlist=[''])
for dn, _ in records:
conn.delete_s(dn)
conn.delete_s(base_dn)
except ldap.NO_SUCH_OBJECT:
conn.search(base_dn, '(objectClass=*)', search_scope=ldap3.LEVEL)
for entry in conn.entries:
conn.delete(entry.entry_dn)
conn.delete(base_dn)
except ldap3.core.exceptions.LDAPNoSuchObjectResult:
pass
def get_ldap_conn(server_url: str):
return ldap3.Connection(
server_url, auto_bind=True, raise_exceptions=True,
authentication=ldap3.SASL, sasl_mechanism=ldap3.KERBEROS)
@pytest.fixture(scope='session')
def ldap_srv(cfg, krb_srv):
conn = ldap.initialize(cfg.get('ldap_server_url'))
conn.sasl_gssapi_bind_s()
conn = get_ldap_conn(cfg.get('ldap_server_url'))
users_base = cfg.get('ldap_users_base')
groups_base = cfg.get('ldap_groups_base')
recursively_delete_subtree(conn, users_base)
recursively_delete_subtree(conn, groups_base)
delete_subtree(conn, users_base)
delete_subtree(conn, groups_base)
for base_dn in [users_base, groups_base]:
ou = base_dn.split(',', 1)[0].split('=')[1]
conn.add_s(base_dn, ldap.modlist.addModlist({
'objectClass': [b'organizationalUnit'],
'ou': [ou.encode()]
}))
conn.add(base_dn, 'organizationalUnit')
_ldap_srv = LDAPService()
component.provideUtility(_ldap_srv, ILDAPService)
yield _ldap_srv
recursively_delete_subtree(conn, users_base)
recursively_delete_subtree(conn, groups_base)
delete_subtree(conn, users_base)
delete_subtree(conn, groups_base)
@pytest.fixture(scope='session')
@ -118,22 +120,18 @@ def mailman_srv(mock_mailman_server, cfg, http_client):
@pytest.fixture(scope='session')
def uwldap_srv(cfg, ldap_srv):
conn = ldap.initialize(cfg.get('uwldap_server_url'))
conn.sasl_gssapi_bind_s()
conn = get_ldap_conn(cfg.get('uwldap_server_url'))
base_dn = cfg.get('uwldap_base')
ou = base_dn.split(',', 1)[0].split('=')[1]
recursively_delete_subtree(conn, base_dn)
delete_subtree(conn, base_dn)
conn.add_s(base_dn, ldap.modlist.addModlist({
'objectClass': [b'organizationalUnit'],
'ou': [ou.encode()]
}))
conn.add(base_dn, 'organizationalUnit')
_uwldap_srv = UWLDAPService()
component.provideUtility(_uwldap_srv, IUWLDAPService)
yield _uwldap_srv
recursively_delete_subtree(conn, base_dn)
delete_subtree(conn, base_dn)
@pytest.fixture(scope='session')
@ -219,8 +217,7 @@ def ldap_group(simple_group):
@pytest.fixture
def uwldap_user(cfg, uwldap_srv):
conn = ldap.initialize(cfg.get('uwldap_server_url'))
conn.sasl_gssapi_bind_s()
conn = get_ldap_conn(cfg.get('uwldap_server_url'))
base_dn = cfg.get('uwldap_base')
user = UWLDAPRecord(
uid='test_jdoe',
@ -231,20 +228,21 @@ def uwldap_user(cfg, uwldap_srv):
given_name='John',
)
dn = f'uid={user.uid},{base_dn}'
conn.add_s(dn, ldap.modlist.addModlist(strings_to_bytes({
'uid': [user.uid],
'mailLocalAddress': user.mail_local_addresses,
'ou': [user.program],
'cn': [user.cn],
'sn': [user.sn],
'givenName': [user.given_name],
'objectClass': [
conn.add(
dn,
[
'inetLocalMailRecipient',
'inetOrgPerson',
'organizationalPerson',
'person',
'top',
],
})))
{
'mailLocalAddress': user.mail_local_addresses,
'ou': user.program,
'cn': user.cn,
'sn': user.sn,
'givenName': user.given_name,
},
)
yield user
conn.delete_s(dn)
conn.delete(dn)

Loading…
Cancel
Save