from flask import Blueprint, request from flask.json import jsonify from zope import component from .utils import authz_restrict_to_staff, authz_restrict_to_syscom, \ user_is_in_group, requires_authentication_no_realm, requires_admin_creds, \ create_streaming_response, development_only, is_truthy from ceo_common.errors import BadRequest, UserAlreadySubscribedError, UserNotSubscribedError from ceo_common.interfaces import ILDAPService, IConfig, IMailService from ceo_common.logger_factory import logger_factory from ceo_common.model.Term import get_terms_for_new_user, get_terms_for_renewal from ceod.transactions.members import ( AddMemberTransaction, ModifyMemberTransaction, DeleteMemberTransaction, ) import ceod.utils as utils bp = Blueprint('members', __name__) logger = logger_factory(__name__) @bp.route('/', methods=['POST'], strict_slashes=False) @requires_admin_creds @authz_restrict_to_staff def create_user(): # We need to use the admin creds here because office members may not # directly create new LDAP records. body = request.get_json(force=True) terms = body.get('terms') non_member_terms = body.get('non_member_terms') if (terms and non_member_terms) or not (terms or non_member_terms): raise BadRequest('Must specify either terms or non-member terms') if type(terms) is int: terms = get_terms_for_new_user(terms) elif type(non_member_terms) is int: non_member_terms = get_terms_for_new_user(non_member_terms) for attr in ['uid', 'cn', 'given_name', 'sn']: if not body.get(attr): raise BadRequest(f"Attribute '{attr}' is missing or empty") if terms: logger.info(f"Creating member {body['uid']} for terms {terms}") else: logger.info(f"Creating club rep {body['uid']} for non-member terms {non_member_terms}") txn = AddMemberTransaction( uid=body['uid'], cn=body['cn'], given_name=body['given_name'], sn=body['sn'], program=body.get('program'), terms=terms, non_member_terms=non_member_terms, forwarding_addresses=body.get('forwarding_addresses'), ) return create_streaming_response(txn) @bp.route('/') @requires_authentication_no_realm def get_user(auth_user: str, username: str): get_forwarding_addresses = False if user_is_in_group(auth_user, 'syscom'): # Only syscom members may see the user's forwarding addresses, # since this requires reading a file in the user's home directory. # To avoid situations where an unprivileged user symlinks their # ~/.forward file to /etc/shadow or something, we don't allow # non-syscom members to use this option either. get_forwarding_addresses = True ldap_srv = component.getUtility(ILDAPService) user = ldap_srv.get_user(username) user_dict = user.to_dict(get_forwarding_addresses) user_dict['groups'] = ldap_srv.get_groups_for_user(username) return user_dict @bp.route('/', methods=['PATCH']) @requires_authentication_no_realm def patch_user(auth_user: str, username: str): if not (auth_user == username or user_is_in_group(auth_user, 'syscom')): return { 'error': "You are not authorized to modify other users' attributes" }, 403 body = request.get_json(force=True) txn = ModifyMemberTransaction( username, login_shell=body.get('login_shell'), forwarding_addresses=body.get('forwarding_addresses'), ) return create_streaming_response(txn) @bp.route('//renew', methods=['POST']) @requires_admin_creds @authz_restrict_to_staff def renew_user(username: str): # We need to use the admin creds here because office members should # not be able to directly modify the shadowExpire field; this could # prevent syscom members from logging into the machines. body = request.get_json(force=True) terms = body.get('terms') non_member_terms = body.get('non_member_terms') if (terms and non_member_terms) or not (terms or non_member_terms): raise BadRequest('Must specify either terms or non-member terms') ldap_srv = component.getUtility(ILDAPService) cfg = component.getUtility(IConfig) user = ldap_srv.get_user(username) member_list = cfg.get('mailman3_new_member_list') if type(terms) is int: terms = get_terms_for_renewal(user.terms, terms) elif type(non_member_terms) is int: non_member_terms = get_terms_for_renewal(user.non_member_terms, non_member_terms) def unexpire(user): if user.shadowExpire: user.set_expired(False) try: user.subscribe_to_mailing_list(member_list) logger.debug(f'Unsubscribed {user.uid} from {member_list}') except UserAlreadySubscribedError: logger.debug(f'{user.uid} is already unsubscribed from {member_list}') if terms: logger.info(f"Renewing member {username} for terms {terms}") user.add_terms(terms) unexpire(user) return {'terms_added': terms} elif non_member_terms: logger.info(f"Renewing club rep {username} for non-member terms {non_member_terms}") user.add_non_member_terms(non_member_terms) unexpire(user) return {'non_member_terms_added': non_member_terms} else: raise BadRequest('Must specify either terms or non-member terms') @bp.route('//pwreset', methods=['POST']) @authz_restrict_to_syscom def reset_user_password(username: str): user = component.getUtility(ILDAPService).get_user(username) password = utils.gen_password() user.change_password(password) return {'password': password} @bp.route('/', methods=['DELETE']) @authz_restrict_to_syscom @development_only def delete_user(username: str): txn = DeleteMemberTransaction(username) return create_streaming_response(txn) @bp.route('/expire', methods=['POST']) @authz_restrict_to_syscom def expire_users(): dry_run = is_truthy(request.args.get('dry_run', 'false')) ldap_srv = component.getUtility(ILDAPService) cfg = component.getUtility(IConfig) members = ldap_srv.get_nonflagged_expired_users() member_list = cfg.get('mailman3_new_member_list') if not dry_run: for member in members: logger.info(f'Expiring {member.uid}') member.set_expired(True) try: member.unsubscribe_from_mailing_list(member_list) logger.debug(f'Unsubscribed {member.uid} from {member_list}') except UserNotSubscribedError: logger.debug(f'{member.uid} is already unsubscribed from {member_list}') return jsonify([member.uid for member in members]) @bp.route('/remindexpire', methods=['POST']) @authz_restrict_to_syscom def remind_users_of_expiration(): dry_run = is_truthy(request.args.get('dry_run', 'false')) ldap_srv = component.getUtility(ILDAPService) members = ldap_srv.get_expiring_users() if not dry_run: mail_srv = component.getUtility(IMailService) for member in members: logger.info(f'Sending renewal reminder to {member.uid}') mail_srv.send_membership_renewal_reminder(member) return jsonify([member.uid for member in members])