from typing import Dict import click from zope import component from ..utils import http_post, http_get, http_delete from .utils import handle_stream_response, handle_sync_response, print_colon_kv, \ check_if_in_development from ceo_common.interfaces import IConfig from ceod.transactions.groups import ( AddGroupTransaction, AddMemberToGroupTransaction, RemoveMemberFromGroupTransaction, DeleteGroupTransaction, ) @click.group(short_help='Perform operations on CSC groups/clubs') def groups(): pass @groups.command(short_help='Add a new group') @click.argument('group_name') @click.option('-d', '--description', help='Group description', prompt=True) def add(group_name, description): click.echo('The following group will be created:') lines = [ ('cn', group_name), ('description', description), ] print_colon_kv(lines) click.confirm('Do you want to continue?', abort=True) body = { 'cn': group_name, 'description': description, } operations = AddGroupTransaction.operations resp = http_post('/api/groups', json=body) data = handle_stream_response(resp, operations) result = data[-1]['result'] print_group_lines(result) def print_group_lines(result: Dict): """Pretty-print a group JSON response.""" lines = [ ('cn', result['cn']), ('description', result.get('description', 'Unknown')), ('gid_number', str(result['gid_number'])), ] for i, member in enumerate(result['members']): if i == 0: prefix = 'members' else: prefix = '' lines.append((prefix, member['cn'] + ' (' + member['uid'] + ')')) print_colon_kv(lines) @groups.command(short_help='Get info about a group') @click.argument('group_name') def get(group_name): resp = http_get('/api/groups/' + group_name) result = handle_sync_response(resp) print_group_lines(result) @groups.command(short_help='Add a member to a group') @click.argument('group_name') @click.argument('username') @click.option('--no-subscribe', is_flag=True, default=False, help='Do not subscribe the member to any auxiliary mailing lists.') def addmember(group_name, username, no_subscribe): click.confirm(f'Are you sure you want to add {username} to {group_name}?', abort=True) base_domain = component.getUtility(IConfig).get('base_domain') url = f'/api/groups/{group_name}/members/{username}' operations = AddMemberToGroupTransaction.operations if no_subscribe: url += '?subscribe_to_lists=false' operations.remove('subscribe_user_to_auxiliary_mailing_lists') resp = http_post(url) data = handle_stream_response(resp, operations) result = data[-1]['result'] lines = [] for i, group in enumerate(result['added_to_groups']): if i == 0: prefix = 'Added to groups' else: prefix = '' lines.append((prefix, group)) for i, mailing_list in enumerate(result.get('subscribed_to_lists', [])): if i == 0: prefix = 'Subscribed to lists' else: prefix = '' if '@' not in mailing_list: mailing_list += '@' + base_domain lines.append((prefix, mailing_list)) print_colon_kv(lines) @groups.command(short_help='Remove a member from a group') @click.argument('group_name') @click.argument('username') @click.option('--no-unsubscribe', is_flag=True, default=False, help='Do not unsubscribe the member from any auxiliary mailing lists.') def removemember(group_name, username, no_unsubscribe): click.confirm(f'Are you sure you want to remove {username} from {group_name}?', abort=True) base_domain = component.getUtility(IConfig).get('base_domain') url = f'/api/groups/{group_name}/members/{username}' operations = RemoveMemberFromGroupTransaction.operations if no_unsubscribe: url += '?unsubscribe_from_lists=false' operations.remove('unsubscribe_user_from_auxiliary_mailing_lists') resp = http_delete(url) data = handle_stream_response(resp, operations) result = data[-1]['result'] lines = [] for i, group in enumerate(result['removed_from_groups']): if i == 0: prefix = 'Removed from groups' else: prefix = '' lines.append((prefix, group)) for i, mailing_list in enumerate(result.get('unsubscribed_from_lists', [])): if i == 0: prefix = 'Unsubscribed from lists' else: prefix = '' if '@' not in mailing_list: mailing_list += '@' + base_domain lines.append((prefix, mailing_list)) print_colon_kv(lines) @groups.command(short_help='Delete a group') @click.argument('group_name') def delete(group_name): check_if_in_development() click.confirm(f"Are you sure you want to delete {group_name}?", abort=True) resp = http_delete(f'/api/groups/{group_name}') handle_stream_response(resp, DeleteGroupTransaction.operations)