pyceo/ceod/api/members.py

113 lines
3.9 KiB
Python

from flask import Blueprint, request
from zope import component
from .utils import authz_restrict_to_staff, authz_restrict_to_syscom, \
user_is_in_group, requires_authentication_no_realm, \
create_streaming_response, development_only
from ceo_common.errors import BadRequest
from ceo_common.interfaces import ILDAPService
from ceod.transactions.members import (
AddMemberTransaction,
ModifyMemberTransaction,
DeleteMemberTransaction,
)
import ceod.utils as utils
bp = Blueprint('members', __name__)
@bp.route('/', methods=['POST'], strict_slashes=False)
@authz_restrict_to_staff
def create_user():
body = request.get_json(force=True)
terms = body.get('terms')
non_member_terms = body.get('non_member_terms')
if (terms and non_member_terms) or not (terms or non_member_terms):
raise BadRequest('Must specify either terms or non-member terms')
for attr in ['uid', 'cn', 'given_name', 'sn']:
if not body.get(attr):
raise BadRequest(f"Attribute '{attr}' is missing or empty")
txn = AddMemberTransaction(
uid=body['uid'],
cn=body['cn'],
given_name=body['given_name'],
sn=body['sn'],
program=body.get('program'),
terms=terms,
non_member_terms=non_member_terms,
forwarding_addresses=body.get('forwarding_addresses'),
)
return create_streaming_response(txn)
@bp.route('/<username>')
@requires_authentication_no_realm
def get_user(auth_user: str, username: str):
get_forwarding_addresses = False
if user_is_in_group(auth_user, 'syscom'):
# Only syscom members may see the user's forwarding addresses,
# since this requires reading a file in the user's home directory.
# To avoid situations where an unprivileged user symlinks their
# ~/.forward file to /etc/shadow or something, we don't allow
# non-syscom members to use this option either.
get_forwarding_addresses = True
ldap_srv = component.getUtility(ILDAPService)
user = ldap_srv.get_user(username)
return user.to_dict(get_forwarding_addresses)
@bp.route('/<username>', methods=['PATCH'])
@requires_authentication_no_realm
def patch_user(auth_user: str, username: str):
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')):
return {
'error': "You are not authorized to modify other users' attributes"
}, 403
body = request.get_json(force=True)
txn = ModifyMemberTransaction(
username,
login_shell=body.get('login_shell'),
forwarding_addresses=body.get('forwarding_addresses'),
)
return create_streaming_response(txn)
@bp.route('/<username>/renew', methods=['POST'])
@authz_restrict_to_staff
def renew_user(username: str):
body = request.get_json(force=True)
terms = body.get('terms')
non_member_terms = body.get('non_member_terms')
if (terms and non_member_terms) or not (terms or non_member_terms):
raise BadRequest('Must specify either terms or non-member terms')
ldap_srv = component.getUtility(ILDAPService)
user = ldap_srv.get_user(username)
if body.get('terms'):
user.add_terms(body['terms'])
return {'terms_added': body['terms']}
elif body.get('non_member_terms'):
user.add_non_member_terms(body['non_member_terms'])
return {'non_member_terms_added': body['non_member_terms']}
else:
raise BadRequest('Must specify either terms or non-member terms')
@bp.route('/<username>/pwreset', methods=['POST'])
@authz_restrict_to_syscom
def reset_user_password(username: str):
user = component.getUtility(ILDAPService).get_user(username)
password = utils.gen_password()
user.change_password(password)
return {'password': password}
@bp.route('/<username>', methods=['DELETE'])
@authz_restrict_to_syscom
@development_only
def delete_user(username: str):
txn = DeleteMemberTransaction(username)
return create_streaming_response(txn)