pyceo/ceo/cli/groups.py

226 lines
7.4 KiB
Python

from typing import Dict
import click
from zope import component
from ..utils import http_post, http_get, http_delete
from .utils import handle_stream_response, handle_sync_response, print_colon_kv, \
check_if_in_development
from ceo_common.interfaces import IConfig
from ceod.transactions.groups import (
AddGroupTransaction,
AddMemberToGroupTransaction,
RemoveMemberFromGroupTransaction,
DeleteGroupTransaction,
)
import heapq
@click.group(short_help='Perform operations on CSC groups/clubs')
def groups():
pass
@groups.command(short_help='Add a new group')
@click.argument('group_name')
@click.option('-d', '--description', help='Group description', prompt=True)
def add(group_name, description):
click.echo('The following group will be created:')
lines = [
('cn', group_name),
('description', description),
]
print_colon_kv(lines)
click.confirm('Do you want to continue?', abort=True)
body = {
'cn': group_name,
'description': description,
}
operations = AddGroupTransaction.operations
resp = http_post('/api/groups', json=body)
data = handle_stream_response(resp, operations)
result = data[-1]['result']
print_group_lines(result)
def print_group_lines(result: Dict):
"""Pretty-print a group JSON response."""
lines = [
('cn', result['cn']),
('description', result.get('description', 'Unknown')),
('gid_number', str(result['gid_number'])),
]
for i, member in enumerate(result['members']):
if i == 0:
prefix = 'members'
else:
prefix = ''
lines.append((prefix, member['cn'] + ' (' + member['uid'] + ')'))
print_colon_kv(lines)
@groups.command(short_help='Get info about a group')
@click.argument('group_name')
def get(group_name):
resp = http_get('/api/groups/' + group_name)
result = handle_sync_response(resp)
print_group_lines(result)
@groups.command(short_help='Add one or more members to a group')
@click.argument('group_name')
@click.argument('username')
@click.argument('usernames', nargs=-1)
@click.option('--no-subscribe', is_flag=True, default=False,
help='Do not subscribe the member(s) to any auxiliary mailing lists.')
def addmember(group_name, username, usernames, no_subscribe):
usernames = [username, *usernames]
if len(usernames) == 1:
click.confirm(f'Are you sure you want to add {username} to {group_name}?',
abort=True)
else:
click.echo(f'The following users will be added to {group_name}:')
click.echo(', '.join(usernames))
click.confirm('Do you want to continue?', abort=True)
base_domain = component.getUtility(IConfig).get('base_domain')
operations = AddMemberToGroupTransaction.operations
if no_subscribe:
operations.remove('subscribe_user_to_auxiliary_mailing_lists')
for username in usernames:
url = f'/api/groups/{group_name}/members/{username}'
if no_subscribe:
url += '?subscribe_to_lists=false'
resp = http_post(url)
data = handle_stream_response(resp, operations)
result = data[-1]['result']
click.echo(f'Added {username} to ' + ', '.join(result['added_to_groups']))
if result.get('subscribed_to_lists'):
mailing_lists = [
mailing_list + '@' + base_domain
if '@' not in mailing_list
else mailing_list
for mailing_list in result['subscribed_to_lists']
]
click.echo(f'Subscribed {username} to ' + ', '.join(mailing_lists))
@groups.command(short_help='Remove one or more members from a group')
@click.argument('group_name')
@click.argument('username')
@click.argument('usernames', nargs=-1)
@click.option('--no-unsubscribe', is_flag=True, default=False,
help='Do not unsubscribe the member(s) from any auxiliary mailing lists.')
def removemember(group_name, username, usernames, no_unsubscribe):
usernames = [username, *usernames]
if len(usernames) == 1:
click.confirm(f'Are you sure you want to remove {username} from {group_name}?',
abort=True)
else:
click.echo(f'The following users will be removed from {group_name}:')
click.echo(', '.join(usernames))
click.confirm('Do you want to continue?', abort=True)
base_domain = component.getUtility(IConfig).get('base_domain')
operations = RemoveMemberFromGroupTransaction.operations
if no_unsubscribe:
operations.remove('unsubscribe_user_from_auxiliary_mailing_lists')
for username in usernames:
url = f'/api/groups/{group_name}/members/{username}'
if no_unsubscribe:
url += '?unsubscribe_from_lists=false'
resp = http_delete(url)
data = handle_stream_response(resp, operations)
result = data[-1]['result']
click.echo(f'Removed {username} from ' + ', '.join(result['removed_from_groups']))
if result.get('unsubscribed_from_lists'):
mailing_lists = [
mailing_list + '@' + base_domain
if '@' not in mailing_list
else mailing_list
for mailing_list in result['unsubscribed_from_lists']
]
click.echo(f'Unsubscribed {username} from ' + ', '.join(mailing_lists))
@groups.command(short_help='Delete a group')
@click.argument('group_name')
def delete(group_name):
check_if_in_development()
click.confirm(f"Are you sure you want to delete {group_name}?", abort=True)
resp = http_delete(f'/api/groups/{group_name}')
handle_stream_response(resp, DeleteGroupTransaction.operations)
# compute levenshtein edit distance
def _fuzzy_match(s1, s2):
if len(s1) == 0:
return len(s2)
if len(s2) == 0:
return len(s1)
edits = []
for i in range(len(s2) + 1):
edits.append(i)
for i in range(len(s1)):
corner = i
edits[0] = i + 1
for j in range(len(s2)):
upper = edits[j + 1]
if s1[i] == s2[j]:
edits[j + 1] = corner
else:
m = min(corner, upper, edits[j])
edits[j + 1] = m + 1
corner = upper
return edits[-1]
# this is needed TRUST ME
class _fuzzy_result:
def __init__(self, string, score):
self.string = string
self.score = score
def __lt__(self, other):
return self.score < other.score
def __gt__(self, other):
return self.score > other.score
def __le__(self, other):
return self.score <= other.score
def __ge__(self, other):
return self.score >= other.score
def __eq__(self, other):
return self.score == other.score
def __ne__(self, other):
return self.score != other.score
@groups.command(short_help='Search for groups')
@click.argument('group_name')
@click.option('--count', default=10, help='number of results to show')
def search(group_name, count):
check_if_in_development()
resp = http_get('/api/groups/clubs')
result = handle_sync_response(resp)
scores = []
for i in range(count):
dummy_result = _fuzzy_result(None, -99999)
scores.append(dummy_result)
for cn in result:
score = -1 * _fuzzy_match(group_name, cn)
result = _fuzzy_result(cn, score)
heapq.heappushpop(scores, result)
matches = heapq.nlargest(count, scores)
for result in matches:
if result.string: click.echo(result.string)