pyceo/ceod/model/User.py

232 lines
8.4 KiB
Python

import json
import os
from typing import List, Dict, Union
import ldap3
from zope import component
from zope.interface import implementer
from .utils import should_be_club_rep
from .validators import is_valid_shell, is_valid_term
from ceo_common.interfaces import ILDAPService, IKerberosService, IFileService, \
IUser, IConfig, IMailmanService
from ceo_common.model import Term
@implementer(IUser)
class User:
def __init__(
self,
uid: str,
cn: str,
given_name: Union[str, None] = None,
sn: Union[str, None] = None,
program: Union[str, None] = None,
terms: Union[List[str], None] = None,
non_member_terms: Union[List[str], None] = None,
login_shell: Union[str, None] = '/bin/bash',
uid_number: Union[int, None] = None,
gid_number: Union[int, None] = None,
home_directory: Union[str, None] = None,
positions: Union[List[str], None] = None,
mail_local_addresses: Union[List[str], None] = None,
is_club_rep: Union[bool, None] = None,
is_club: bool = False,
is_member_or_club_rep: Union[bool, None] = None,
ldap3_entry: Union[ldap3.Entry, None] = None,
shadowExpire: Union[int, None] = None,
):
cfg = component.getUtility(IConfig)
self.uid = uid
self.cn = cn
self.given_name = given_name
self.sn = sn
self.program = program
self.terms = terms or []
self.non_member_terms = non_member_terms or []
self.login_shell = login_shell
self.uid_number = uid_number
self.gid_number = gid_number
if home_directory is None:
if is_club:
home_parent = cfg.get('members_home')
else:
home_parent = cfg.get('clubs_home')
self.home_directory = os.path.join(home_parent, uid)
else:
self.home_directory = home_directory
self.positions = positions or []
self.mail_local_addresses = mail_local_addresses or []
self._is_club = is_club
if is_club_rep is None:
if is_club:
# not a real user
is_club_rep = False
else:
is_club_rep = should_be_club_rep(terms, non_member_terms)
self.is_club_rep = is_club_rep
if is_member_or_club_rep is None:
is_member_or_club_rep = terms is not None or non_member_terms is not None
self._is_member_or_club_rep = is_member_or_club_rep
self.ldap3_entry = ldap3_entry
self.shadowExpire = shadowExpire
self.ldap_srv = component.getUtility(ILDAPService)
self.krb_srv = component.getUtility(IKerberosService)
self.base_domain = cfg.get('base_domain')
def to_dict(self, get_forwarding_addresses: bool = False) -> Dict:
data = {
'cn': self.cn,
'uid': self.uid,
'uid_number': self.uid_number,
'gid_number': self.gid_number,
'home_directory': self.home_directory,
'is_club': self.is_club(),
'is_club_rep': self.is_club_rep,
'program': self.program or 'Unknown',
'shadowExpire': self.shadowExpire,
}
if self.sn and self.given_name:
data['sn'] = self.sn
data['given_name'] = self.given_name
if self.login_shell:
data['login_shell'] = self.login_shell
if self.terms:
data['terms'] = self.terms
if self.non_member_terms:
data['non_member_terms'] = self.non_member_terms
if self.positions:
data['positions'] = self.positions
if self.mail_local_addresses:
data['mail_local_addresses'] = self.mail_local_addresses
if get_forwarding_addresses:
data['forwarding_addresses'] = self.get_forwarding_addresses()
return data
def __repr__(self) -> str:
return json.dumps(self.to_dict(), indent=2)
def is_club(self) -> bool:
return self._is_club
def is_member_or_club_rep(self) -> bool:
return self._is_member_or_club_rep
def is_member(self):
return self.is_member_or_club_rep() and not self.is_club_rep
def add_to_ldap(self):
if not self.mail_local_addresses:
self.mail_local_addresses = [f'{self.uid}@{self.base_domain}']
self.ldap_srv.add_user(self)
def remove_from_ldap(self):
self.ldap_srv.remove_user(self)
def add_to_kerberos(self, password: str):
self.krb_srv.addprinc(self.uid, password)
def remove_from_kerberos(self):
self.krb_srv.delprinc(self.uid)
def change_password(self, password: str):
self.krb_srv.change_password(self.uid, password)
def create_home_dir(self):
file_srv = component.getUtility(IFileService)
file_srv.create_home_dir(self)
def delete_home_dir(self):
file_srv = component.getUtility(IFileService)
file_srv.delete_home_dir(self)
def subscribe_to_mailing_list(self, mailing_list: str):
component.getUtility(IMailmanService).subscribe(self.uid, mailing_list)
def unsubscribe_from_mailing_list(self, mailing_list: str):
component.getUtility(IMailmanService).unsubscribe(self.uid, mailing_list)
@staticmethod
def deserialize_from_ldap(entry: ldap3.Entry) -> IUser:
"""Deserialize this user from an LDAP entry."""
attrs = entry.entry_attributes_as_dict
return User(
uid=attrs['uid'][0],
cn=attrs['cn'][0],
given_name=attrs.get('givenName', [None])[0],
sn=attrs.get('sn', [None])[0],
program=attrs.get('program', [None])[0],
terms=attrs.get('term'),
non_member_terms=attrs.get('nonMemberTerm'),
login_shell=attrs.get('loginShell', [None])[0],
uid_number=attrs['uidNumber'][0],
gid_number=attrs['gidNumber'][0],
home_directory=attrs['homeDirectory'][0],
positions=attrs.get('position'),
mail_local_addresses=attrs.get('mailLocalAddress'),
is_club_rep=attrs.get('isClubRep', [False])[0],
is_club=('club' in attrs['objectClass']),
is_member_or_club_rep=('member' in attrs['objectClass']),
shadowExpire=attrs.get('shadowExpire'),
ldap3_entry=entry,
)
def replace_login_shell(self, login_shell: str):
if not is_valid_shell(login_shell):
raise Exception('%s is not a valid shell' % login_shell)
with self.ldap_srv.entry_ctx_for_user(self) as entry:
entry.loginShell = login_shell
self.login_shell = login_shell
def add_terms(self, terms: List[str]):
for term in terms:
if not is_valid_term(term):
raise Exception('%s is not a valid term' % term)
with self.ldap_srv.entry_ctx_for_user(self) as entry:
entry.term.add(terms)
if entry.isClubRep.value:
entry.isClubRep.remove()
self.terms.extend(terms)
self.is_club_rep = False
def add_non_member_terms(self, terms: List[str]):
for term in terms:
if not is_valid_term(term):
raise Exception('%s is not a valid term' % term)
with self.ldap_srv.entry_ctx_for_user(self) as entry:
entry.nonMemberTerm.add(terms)
entry.isClubRep = True
self.non_member_terms.extend(terms)
self.is_club_rep = True
def set_positions(self, positions: List[str]):
with self.ldap_srv.entry_ctx_for_user(self) as entry:
entry.position = positions
self.positions = positions
def get_forwarding_addresses(self) -> List[str]:
file_srv = component.getUtility(IFileService)
return file_srv.get_forwarding_addresses(self)
def set_forwarding_addresses(self, addresses: List[str]):
file_srv = component.getUtility(IFileService)
file_srv.set_forwarding_addresses(self, addresses)
def membership_is_valid(self) -> bool:
if not self.terms:
return False
current_term = Term.current()
most_recent_term = max(map(Term, self.terms))
return most_recent_term >= current_term
def set_expired(self, expired: bool):
with self.ldap_srv.entry_ctx_for_user(self) as entry:
if expired:
entry.shadowExpire = 1
self.shadowExpire = 1
else:
entry.shadowExpire.remove()
self.shadowExpire = None