pyceo/ceod/api/members.py

208 lines
7.7 KiB
Python

from flask import Blueprint, request
from flask.json import jsonify
from zope import component
from .utils import authz_restrict_to_staff, authz_restrict_to_syscom, \
user_is_in_group, requires_authentication_no_realm, requires_admin_creds, \
create_streaming_response, development_only, is_truthy
from ceo_common.errors import BadRequest, UserAlreadySubscribedError, UserNotSubscribedError
from ceo_common.interfaces import ILDAPService, IConfig, IMailService
from ceo_common.logger_factory import logger_factory
from ceo_common.model.Term import get_terms_for_new_user, get_terms_for_renewal
from ceo_common.utils import validate_username
from ceod.transactions.members import (
AddMemberTransaction,
ModifyMemberTransaction,
DeleteMemberTransaction,
)
import ceod.utils as utils
bp = Blueprint('members', __name__)
logger = logger_factory(__name__)
@bp.route('/', methods=['POST'], strict_slashes=False)
@requires_admin_creds
@authz_restrict_to_staff
def create_user():
# We need to use the admin creds here because office members may not
# directly create new LDAP records.
body = request.get_json(force=True)
terms = body.get('terms')
non_member_terms = body.get('non_member_terms')
if (terms and non_member_terms) or not (terms or non_member_terms):
raise BadRequest('Must specify either terms or non-member terms')
if type(terms) is int:
terms = get_terms_for_new_user(terms)
elif type(non_member_terms) is int:
non_member_terms = get_terms_for_new_user(non_member_terms)
for attr in ['uid', 'cn', 'given_name', 'sn', 'forwarding_addresses']:
if not body.get(attr):
raise BadRequest(f"Attribute '{attr}' is missing or empty")
if type(body['forwarding_addresses']) is not list:
raise BadRequest('forwarding_addresses must be a list of email addresses')
uid_validator = validate_username(body['uid'])
if not uid_validator.is_valid:
raise BadRequest("Attribute 'uid' is missing or invalid")
if terms:
logger.info(f"Creating member {body['uid']} for terms {terms}")
else:
logger.info(f"Creating club rep {body['uid']} for non-member terms {non_member_terms}")
txn = AddMemberTransaction(
uid=body['uid'],
cn=body['cn'],
given_name=body['given_name'],
sn=body['sn'],
program=body.get('program'),
terms=terms,
non_member_terms=non_member_terms,
forwarding_addresses=body['forwarding_addresses'],
)
return create_streaming_response(txn)
@bp.route('/<username>')
@requires_authentication_no_realm
def get_user(auth_user: str, username: str):
get_forwarding_addresses = False
if user_is_in_group(auth_user, 'syscom'):
# Only syscom members may see the user's forwarding addresses,
# since this requires reading a file in the user's home directory.
# To avoid situations where an unprivileged user symlinks their
# ~/.forward file to /etc/shadow or something, we don't allow
# non-syscom members to use this option either.
get_forwarding_addresses = True
ldap_srv = component.getUtility(ILDAPService)
user = ldap_srv.get_user(username)
user_dict = user.to_dict(get_forwarding_addresses)
user_dict['groups'] = ldap_srv.get_groups_for_user(username)
return user_dict
@bp.route('/<username>', methods=['PATCH'])
@requires_authentication_no_realm
def patch_user(auth_user: str, username: str):
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')):
return {
'error': "You are not authorized to modify other users' attributes"
}, 403
body = request.get_json(force=True)
txn = ModifyMemberTransaction(
username,
login_shell=body.get('login_shell'),
forwarding_addresses=body.get('forwarding_addresses'),
)
return create_streaming_response(txn)
@bp.route('/<username>/renew', methods=['POST'])
@requires_admin_creds
@authz_restrict_to_staff
def renew_user(username: str):
# We need to use the admin creds here because office members should
# not be able to directly modify the shadowExpire field; this could
# prevent syscom members from logging into the machines.
body = request.get_json(force=True)
terms = body.get('terms')
non_member_terms = body.get('non_member_terms')
if (terms and non_member_terms) or not (terms or non_member_terms):
raise BadRequest('Must specify either terms or non-member terms')
ldap_srv = component.getUtility(ILDAPService)
cfg = component.getUtility(IConfig)
user = ldap_srv.get_user(username)
member_list = cfg.get('mailman3_new_member_list')
if type(terms) is int:
terms = get_terms_for_renewal(user.terms, terms)
elif type(non_member_terms) is int:
non_member_terms = get_terms_for_renewal(user.non_member_terms, non_member_terms)
def unexpire(user):
if user.shadowExpire:
user.set_expired(False)
try:
user.subscribe_to_mailing_list(member_list)
logger.debug(f'Subscribed {user.uid} to {member_list}')
except UserAlreadySubscribedError:
logger.debug(f'{user.uid} is already subscribed to {member_list}')
if terms:
logger.info(f"Renewing member {username} for terms {terms}")
user.add_terms(terms)
unexpire(user)
return {'terms_added': terms}
elif non_member_terms:
logger.info(f"Renewing club rep {username} for non-member terms {non_member_terms}")
user.add_non_member_terms(non_member_terms)
unexpire(user)
return {'non_member_terms_added': non_member_terms}
else:
raise BadRequest('Must specify either terms or non-member terms')
@bp.route('/<username>/pwreset', methods=['POST'])
@authz_restrict_to_syscom
def reset_user_password(username: str):
user = component.getUtility(ILDAPService).get_user(username)
password = utils.gen_password()
user.change_password(password)
return {'password': password}
@bp.route('/<username>', methods=['DELETE'])
@requires_admin_creds
@authz_restrict_to_syscom
@development_only
def delete_user(username: str):
# We use the admin creds for the integration tests for the web app, which
# uses the ceod/<host> key
txn = DeleteMemberTransaction(username)
return create_streaming_response(txn)
@bp.route('/expire', methods=['POST'])
@authz_restrict_to_syscom
def expire_users():
dry_run = is_truthy(request.args.get('dry_run', 'false'))
ldap_srv = component.getUtility(ILDAPService)
cfg = component.getUtility(IConfig)
members = ldap_srv.get_nonflagged_expired_users()
member_list = cfg.get('mailman3_new_member_list')
if not dry_run:
for member in members:
logger.info(f'Expiring {member.uid}')
member.set_expired(True)
try:
member.unsubscribe_from_mailing_list(member_list)
logger.debug(f'Unsubscribed {member.uid} from {member_list}')
except UserNotSubscribedError:
logger.debug(f'{member.uid} is already unsubscribed from {member_list}')
return jsonify([member.uid for member in members])
@bp.route('/remindexpire', methods=['POST'])
@authz_restrict_to_syscom
def remind_users_of_expiration():
dry_run = is_truthy(request.args.get('dry_run', 'false'))
ldap_srv = component.getUtility(ILDAPService)
members = ldap_srv.get_expiring_users()
if not dry_run:
mail_srv = component.getUtility(IMailService)
for member in members:
logger.info(f'Sending renewal reminder to {member.uid}')
mail_srv.send_membership_renewal_reminder(member)
return jsonify([member.uid for member in members])