forked from public/pyceo
86 lines
2.5 KiB
Python
86 lines
2.5 KiB
Python
import copy
|
|
import json
|
|
from typing import List, Dict, Union
|
|
|
|
from zope import component
|
|
from zope.interface import implementer
|
|
|
|
from .utils import strings_to_bytes, bytes_to_strings, dn_to_uid
|
|
from ceo_common.interfaces import ILDAPService, IGroup, IConfig
|
|
|
|
|
|
@implementer(IGroup)
|
|
class Group:
|
|
def __init__(
|
|
self, cn: str, gid_number: int,
|
|
members: Union[List[str], None] = None,
|
|
):
|
|
self.cn = cn
|
|
self.gid_number = gid_number
|
|
# this is a list of the usernames of the members in this group
|
|
self.members = members or []
|
|
|
|
cfg = component.getUtility(IConfig)
|
|
self.dn = f'cn={cn},{cfg.get("ldap_groups_base")}'
|
|
self.ldap_users_base = cfg.get('ldap_users_base')
|
|
self.ldap_srv = component.getUtility(ILDAPService)
|
|
|
|
def __repr__(self) -> str:
|
|
return json.dumps(self.to_dict(), indent=2)
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'dn': self.dn,
|
|
'cn': self.cn,
|
|
'gid_number': self.gid_number,
|
|
'members': self.members,
|
|
}
|
|
|
|
def add_to_ldap(self):
|
|
self.ldap_srv.add_group(self)
|
|
|
|
def remove_from_ldap(self):
|
|
self.ldap_srv.remove_group(self)
|
|
|
|
def serialize_for_ldap(self) -> Dict[str, List[bytes]]:
|
|
data = {
|
|
'cn': [self.cn],
|
|
'gidNumber': [str(self.gid_number)],
|
|
'objectClass': [
|
|
'top',
|
|
'group',
|
|
'posixGroup',
|
|
],
|
|
}
|
|
if self.members:
|
|
data['uniqueMember'] = [
|
|
f'uid={member},{self.ldap_users_base}'
|
|
for member in self.members
|
|
]
|
|
return strings_to_bytes(data)
|
|
|
|
@staticmethod
|
|
def deserialize_from_ldap(data: Dict[str, List[bytes]]) -> IGroup:
|
|
data = bytes_to_strings(data)
|
|
return Group(
|
|
cn=data['cn'][0],
|
|
gid_number=int(data['gidNumber'][0]),
|
|
members=[
|
|
dn_to_uid(dn) for dn in data.get('uniqueMember', [])
|
|
],
|
|
)
|
|
|
|
def add_member(self, username: str):
|
|
new_group = copy.copy(self)
|
|
new_group.members = self.members.copy()
|
|
new_group.members.append(username)
|
|
self.ldap_srv.modify_group(self, new_group)
|
|
self.members = new_group.members
|
|
|
|
def remove_member(self, username: str):
|
|
new_group = copy.copy(self)
|
|
new_group.members = self.members.copy()
|
|
new_group.members.remove(username)
|
|
self.ldap_srv.modify_group(self, new_group)
|
|
self.members = new_group.members
|