import copy import json import os from typing import List, Dict, Union from zope import component from zope.interface import implementer from .utils import strings_to_bytes, bytes_to_strings from .validators import is_valid_shell, is_valid_term from ceo_common.interfaces import ILDAPService, IKerberosService, IFileService, \ IUser, IConfig, IMailmanService @implementer(IUser) class User: def __init__( self, uid: str, cn: str, program: Union[str, None] = None, terms: Union[List[str], None] = None, non_member_terms: Union[List[str], None] = None, login_shell: str = '/bin/bash', uid_number: Union[int, None] = None, gid_number: Union[int, None] = None, home_directory: Union[str, None] = None, positions: Union[List[str], None] = None, mail_local_addresses: Union[List[str], None] = None, is_club: bool = False, ): if not is_club and not terms and not non_member_terms: raise Exception('terms and non_member_terms cannot both be empty') cfg = component.getUtility(IConfig) self.uid = uid self.cn = cn self.program = program self.terms = terms or [] self.non_member_terms = non_member_terms or [] self.login_shell = login_shell self.uid_number = uid_number self.gid_number = gid_number if home_directory is None: if is_club: home_parent = cfg.get('members_home') else: home_parent = cfg.get('clubs_home') self.home_directory = os.path.join(home_parent, uid) else: self.home_directory = home_directory self.positions = positions or [] self.mail_local_addresses = mail_local_addresses or [] self._is_club = is_club self.ldap_sasl_realm = cfg.get('ldap_sasl_realm') self.dn = f'uid={uid},{cfg.get("ldap_users_base")}' self.ldap_srv = component.getUtility(ILDAPService) self.krb_srv = component.getUtility(IKerberosService) self.file_srv = component.getUtility(IFileService) def to_dict(self, get_forwarding_addresses: bool = False) -> Dict: data = { 'dn': self.dn, 'cn': self.cn, 'uid': self.uid, 'uid_number': self.uid_number, 'gid_number': self.gid_number, 'login_shell': self.login_shell, 'home_directory': self.home_directory, 'is_club': self.is_club(), } if self.program: data['program'] = self.program if self.terms: data['terms'] = self.terms if self.non_member_terms: data['non_member_terms'] = self.non_member_terms if self.positions: data['positions'] = self.positions if self.mail_local_addresses: data['mail_local_addresses'] = self.mail_local_addresses if get_forwarding_addresses: data['forwarding_addresses'] = self.get_forwarding_addresses() return data def __repr__(self) -> str: return json.dumps(self.to_dict(), indent=2) def is_club(self) -> bool: return self._is_club def add_to_ldap(self): new_member = self.ldap_srv.add_user(self) self.uid_number = new_member.uid_number self.gid_number = new_member.gid_number def remove_from_ldap(self): self.ldap_srv.remove_user(self) self.uid_number = None self.gid_number = None def add_to_kerberos(self, password: str): self.krb_srv.addprinc(self.uid, password) def remove_from_kerberos(self): self.krb_srv.delprinc(self.uid) def change_password(self, password: str): self.krb_srv.change_password(self.uid, password) def create_home_dir(self): self.file_srv.create_home_dir(self) def delete_home_dir(self): self.file_srv.delete_home_dir(self) def subscribe_to_mailing_list(self, mailing_list: str): component.getUtility(IMailmanService).subscribe(self.uid, mailing_list) def unsubscribe_from_mailing_list(self, mailing_list: str): component.getUtility(IMailmanService).unsubscribe(self.uid, mailing_list) def serialize_for_ldap(self) -> Dict: data = { 'cn': [self.cn], 'loginShell': [self.login_shell], 'homeDirectory': [self.home_directory], 'uid': [self.uid], 'uidNumber': [str(self.uid_number)], 'gidNumber': [str(self.gid_number)], 'objectClass': [ 'top', 'account', 'posixAccount', 'shadowAccount', ], } if self.is_club(): data['objectClass'].append('club') else: data['objectClass'].append('member') data['userPassword'] = ['{SASL}%s@%s' % (self.uid, self.ldap_sasl_realm)] if self.program: data['program'] = [self.program] if self.terms: data['term'] = self.terms if self.non_member_terms: data['nonMemberTerm'] = self.non_member_terms if self.positions: data['position'] = self.positions if self.mail_local_addresses: data['mailLocalAddress'] = self.mail_local_addresses data['objectClass'].append('inetLocalMailRecipient') return strings_to_bytes(data) @staticmethod def deserialize_from_ldap(data: Dict[str, List[bytes]]) -> IUser: data = bytes_to_strings(data) return User( uid=data['uid'][0], cn=data['cn'][0], program=data.get('program', [None])[0], terms=data.get('term'), non_member_terms=data.get('nonUserTerm'), login_shell=data['loginShell'][0], uid_number=int(data['uidNumber'][0]), gid_number=int(data['gidNumber'][0]), home_directory=data['homeDirectory'][0], positions=data.get('position'), mail_local_addresses=data.get('mailLocalAddress', []), is_club=('club' in data['objectClass']), ) def replace_login_shell(self, login_shell: str): new_user = copy.copy(self) if not is_valid_shell(login_shell): raise Exception('%s is not a valid shell' % login_shell) new_user.login_shell = login_shell self.ldap_srv.modify_user(self, new_user) def add_terms(self, terms: List[str]): for term in terms: if not is_valid_term(term): raise Exception('%s is not a valid term' % term) new_user = copy.copy(self) new_user.terms = self.terms.copy() new_user.terms.extend(terms) self.ldap_srv.modify_user(self, new_user) self.terms = new_user.terms def add_non_member_terms(self, terms: List[str]): for term in terms: if not is_valid_term(term): raise Exception('%s is not a valid term' % term) new_user = copy.copy(self) new_user.non_member_terms = self.non_member_terms.copy() new_user.non_member_terms.extend(terms) self.ldap_srv.modify_user(self, new_user) self.non_member_terms = new_user.non_member_terms def add_position(self, position: str): new_user = copy.copy(self) new_user.positions = [*self.positions, position] self.ldap_srv.modify_user(self, new_user) self.positions = new_user.positions def remove_position(self, position: str): new_user = copy.copy(self) new_user.positions = self.positions.copy() new_user.positions.remove(position) self.ldap_srv.modify_user(self, new_user) self.positions = new_user.positions def get_forwarding_addresses(self) -> List[str]: return self.file_srv.get_forwarding_addresses(self) def set_forwarding_addresses(self, addresses: List[str]): self.file_srv.set_forwarding_addresses(self, addresses) forwarding_addresses = property(get_forwarding_addresses, set_forwarding_addresses)