pyceo/ceod/model/Group.py

116 lines
3.8 KiB
Python

import json
from typing import List, Union
import ldap3
from zope import component
from zope.interface import implementer
from .utils import dn_to_uid
from ceo_common.errors import UserAlreadyInGroupError, UserNotInGroupError, \
UserNotFoundError
from ceo_common.interfaces import ILDAPService, IGroup
from ceo_common.logger_factory import logger_factory
logger = logger_factory(__name__)
@implementer(IGroup)
class Group:
def __init__(
self,
cn: str,
gid_number: int,
description: Union[str, None] = None,
members: Union[List[str], None] = None,
ldap3_entry: Union[ldap3.Entry, None] = None,
user_cn: Union[str, None] = None,
):
self.cn = cn
self.gid_number = gid_number
self.description = description
# this is a list of the usernames of the members in this group
self.members = members or []
self.ldap3_entry = ldap3_entry
self.user_cn = user_cn
self.ldap_srv = component.getUtility(ILDAPService)
def __repr__(self) -> str:
return json.dumps(self.to_dict(), indent=2)
def to_dict(self):
data = {
'cn': self.cn,
'gid_number': self.gid_number,
}
description = None
if self.description:
description = self.description
elif self.user_cn:
# for clubs, the human-readable description is stored in the
# 'cn' attribute of the associated user
description = self.user_cn
else:
try:
# TODO: only fetch the CN to save bandwidth
user = self.ldap_srv.get_user(self.cn)
description = user.cn
self.user_cn = user.cn
except UserNotFoundError:
# some groups, like syscom, don't have an associated user
pass
if description:
data['description'] = description
# to_dict() is usually called for display purposes, so get some more
# information to display
data['members'] = self.ldap_srv.get_display_info_for_users(self.members)
return data
def add_to_ldap(self):
self.ldap_srv.add_group(self)
def remove_from_ldap(self):
self.ldap_srv.remove_group(self)
@staticmethod
def deserialize_from_ldap(entry: ldap3.Entry) -> IGroup:
"""Deserialize this group from an LDAP entry."""
attrs = entry.entry_attributes_as_dict
return Group(
cn=attrs['cn'][0],
gid_number=attrs['gidNumber'][0],
description=attrs.get('description', [None])[0],
members=[
dn_to_uid(dn) for dn in attrs.get('uniqueMember', [])
],
ldap3_entry=entry,
)
def add_member(self, username: str):
dn = self.ldap_srv.uid_to_dn(username)
try:
with self.ldap_srv.entry_ctx_for_group(self) as entry:
entry.uniqueMember.add(dn)
except ldap3.core.exceptions.LDAPAttributeOrValueExistsResult as err:
logger.warning(err)
raise UserAlreadyInGroupError()
self.members.append(username)
def remove_member(self, username: str):
dn = self.ldap_srv.uid_to_dn(username)
try:
with self.ldap_srv.entry_ctx_for_group(self) as entry:
entry.uniqueMember.delete(dn)
except ldap3.core.exceptions.LDAPCursorError as err:
logger.warning(err)
raise UserNotInGroupError()
self.members.remove(username)
def set_members(self, usernames: List[str]):
DNs = [
self.ldap_srv.uid_to_dn(username) for username in usernames
]
with self.ldap_srv.entry_ctx_for_group(self) as entry:
entry.uniqueMember = DNs
self.members = usernames