import json import os from typing import List, Dict, Union import ldap3 from zope import component from zope.interface import implementer from .validators import is_valid_shell, is_valid_term from ceo_common.interfaces import ILDAPService, IKerberosService, IFileService, \ IUser, IConfig, IMailmanService @implementer(IUser) class User: def __init__( self, uid: str, cn: str, program: Union[str, None] = None, terms: Union[List[str], None] = None, non_member_terms: Union[List[str], None] = None, login_shell: str = '/bin/bash', uid_number: Union[int, None] = None, gid_number: Union[int, None] = None, home_directory: Union[str, None] = None, positions: Union[List[str], None] = None, mail_local_addresses: Union[List[str], None] = None, is_club: bool = False, ldap3_entry: Union[ldap3.Entry, None] = None, ): if not is_club and not terms and not non_member_terms: raise Exception('terms and non_member_terms cannot both be empty') cfg = component.getUtility(IConfig) self.uid = uid self.cn = cn self.program = program self.terms = terms or [] self.non_member_terms = non_member_terms or [] self.login_shell = login_shell self.uid_number = uid_number self.gid_number = gid_number if home_directory is None: if is_club: home_parent = cfg.get('members_home') else: home_parent = cfg.get('clubs_home') self.home_directory = os.path.join(home_parent, uid) else: self.home_directory = home_directory self.positions = positions or [] self.mail_local_addresses = mail_local_addresses or [] self._is_club = is_club self.ldap3_entry = ldap3_entry self.ldap_srv = component.getUtility(ILDAPService) self.krb_srv = component.getUtility(IKerberosService) def to_dict(self, get_forwarding_addresses: bool = False) -> Dict: data = { 'cn': self.cn, 'uid': self.uid, 'uid_number': self.uid_number, 'gid_number': self.gid_number, 'login_shell': self.login_shell, 'home_directory': self.home_directory, 'is_club': self.is_club(), 'program': self.program or 'Unknown', } if self.terms: data['terms'] = self.terms if self.non_member_terms: data['non_member_terms'] = self.non_member_terms if self.positions: data['positions'] = self.positions if self.mail_local_addresses: data['mail_local_addresses'] = self.mail_local_addresses if get_forwarding_addresses: data['forwarding_addresses'] = self.get_forwarding_addresses() return data def __repr__(self) -> str: return json.dumps(self.to_dict(), indent=2) def is_club(self) -> bool: return self._is_club def add_to_ldap(self): self.ldap_srv.add_user(self) def remove_from_ldap(self): self.ldap_srv.remove_user(self) def add_to_kerberos(self, password: str): self.krb_srv.addprinc(self.uid, password) def remove_from_kerberos(self): self.krb_srv.delprinc(self.uid) def change_password(self, password: str): self.krb_srv.change_password(self.uid, password) def create_home_dir(self): file_srv = component.getUtility(IFileService) file_srv.create_home_dir(self) def delete_home_dir(self): file_srv = component.getUtility(IFileService) file_srv.delete_home_dir(self) def subscribe_to_mailing_list(self, mailing_list: str): component.getUtility(IMailmanService).subscribe(self.uid, mailing_list) def unsubscribe_from_mailing_list(self, mailing_list: str): component.getUtility(IMailmanService).unsubscribe(self.uid, mailing_list) @staticmethod def deserialize_from_ldap(entry: ldap3.Entry) -> IUser: """Deserialize this user from an LDAP entry.""" attrs = entry.entry_attributes_as_dict return User( uid=attrs['uid'][0], cn=attrs['cn'][0], program=attrs.get('program', [None])[0], terms=attrs.get('term'), non_member_terms=attrs.get('nonMemberTerm'), login_shell=attrs['loginShell'][0], uid_number=attrs['uidNumber'][0], gid_number=attrs['gidNumber'][0], home_directory=attrs['homeDirectory'][0], positions=attrs.get('position'), mail_local_addresses=attrs.get('mailLocalAddress'), is_club=('club' in attrs['objectClass']), ldap3_entry=entry, ) def replace_login_shell(self, login_shell: str): if not is_valid_shell(login_shell): raise Exception('%s is not a valid shell' % login_shell) with self.ldap_srv.entry_ctx_for_user(self) as entry: entry.loginShell = login_shell self.login_shell = login_shell def add_terms(self, terms: List[str]): for term in terms: if not is_valid_term(term): raise Exception('%s is not a valid term' % term) with self.ldap_srv.entry_ctx_for_user(self) as entry: entry.term.add(terms) self.terms.extend(terms) def add_non_member_terms(self, terms: List[str]): for term in terms: if not is_valid_term(term): raise Exception('%s is not a valid term' % term) with self.ldap_srv.entry_ctx_for_user(self) as entry: entry.nonMemberTerm.add(terms) self.non_member_terms.extend(terms) def set_positions(self, positions: List[str]): with self.ldap_srv.entry_ctx_for_user(self) as entry: entry.position = positions self.positions = positions def get_forwarding_addresses(self) -> List[str]: file_srv = component.getUtility(IFileService) return file_srv.get_forwarding_addresses(self) def set_forwarding_addresses(self, addresses: List[str]): file_srv = component.getUtility(IFileService) file_srv.set_forwarding_addresses(self, addresses)