from flask import Blueprint, request from zope import component from .utils import authz_restrict_to_staff, authz_restrict_to_syscom, \ user_is_in_group, requires_authentication_no_realm, \ create_streaming_response, development_only from ceo_common.errors import UserNotFoundError, BadRequest from ceo_common.interfaces import ILDAPService from ceod.transactions.members import ( AddMemberTransaction, ModifyMemberTransaction, DeleteMemberTransaction, ) import ceod.utils as utils bp = Blueprint('members', __name__) @bp.route('/', methods=['POST'], strict_slashes=False) @authz_restrict_to_staff def create_user(): body = request.get_json(force=True) txn = AddMemberTransaction( uid=body['uid'], cn=body['cn'], program=body.get('program'), terms=body.get('terms'), non_member_terms=body.get('non_member_terms'), forwarding_addresses=body.get('forwarding_addresses'), ) return create_streaming_response(txn) @bp.route('/') @requires_authentication_no_realm def get_user(auth_user: str, username: str): get_forwarding_addresses = False if auth_user == username or user_is_in_group(auth_user, 'syscom'): get_forwarding_addresses = True ldap_srv = component.getUtility(ILDAPService) try: user = ldap_srv.get_user(username) return user.to_dict(get_forwarding_addresses) except UserNotFoundError: return { 'error': 'user not found' }, 404 @bp.route('/', methods=['PATCH']) @requires_authentication_no_realm def patch_user(auth_user: str, username: str): if not (auth_user == username or user_is_in_group(auth_user, 'syscom')): return { 'error': "You are not authorized to modify other users' attributes" }, 403 body = request.get_json(force=True) txn = ModifyMemberTransaction( username, login_shell=body.get('login_shell'), forwarding_addresses=body.get('forwarding_addresses'), ) return create_streaming_response(txn) @bp.route('//renew', methods=['POST']) @authz_restrict_to_staff def renew_user(username: str): body = request.get_json(force=True) ldap_srv = component.getUtility(ILDAPService) user = ldap_srv.get_user(username) if body.get('terms'): user.add_terms(body['terms']) return {'terms_added': body['terms']} elif body.get('non_member_terms'): user.add_non_member_terms(body['non_member_terms']) return {'non_member_terms_added': body['non_member_terms']} else: raise BadRequest('Must specify either terms or non-member terms') @bp.route('//pwreset', methods=['POST']) @authz_restrict_to_syscom def reset_user_password(username: str): user = component.getUtility(ILDAPService).get_user(username) password = utils.gen_password() user.change_password(password) return {'password': password} @bp.route('/', methods=['DELETE']) @authz_restrict_to_syscom @development_only def delete_user(username: str): txn = DeleteMemberTransaction(username) return create_streaming_response(txn)