add groups CLI
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Max Erenberg 2021-08-24 05:48:55 +00:00
parent 45192d75bf
commit 831ebf17aa
6 changed files with 330 additions and 7 deletions

View File

@ -7,6 +7,7 @@ from zope import component
from ..krb_check import krb_check
from .members import members
from .groups import groups
from ceo_common.interfaces import IConfig, IHTTPClient
from ceo_common.model import Config, HTTPClient
@ -26,6 +27,7 @@ def cli(ctx):
cli.add_command(members)
cli.add_command(groups)
def register_services():

148
ceo/cli/groups.py Normal file
View File

@ -0,0 +1,148 @@
from typing import Dict
import click
from zope import component
from ..utils import http_post, http_get, http_delete
from .utils import handle_stream_response, handle_sync_response, print_colon_kv, \
check_if_in_development
from ceo_common.interfaces import IConfig
from ceod.transactions.groups import (
AddGroupTransaction,
AddMemberToGroupTransaction,
RemoveMemberFromGroupTransaction,
DeleteGroupTransaction,
)
@click.group()
def groups():
pass
@groups.command(short_help='Add a new group')
@click.argument('group_name')
@click.option('-d', '--description', help='Group description', prompt=True)
def add(group_name, description):
click.echo('The following group will be created:')
lines = [
('cn', group_name),
('description', description),
]
print_colon_kv(lines)
click.confirm('Do you want to continue?', abort=True)
body = {
'cn': group_name,
'description': description,
}
operations = AddGroupTransaction.operations
resp = http_post('/api/groups', json=body)
data = handle_stream_response(resp, operations)
result = data[-1]['result']
print_group_lines(result)
def print_group_lines(result: Dict):
"""Pretty-print a group JSON response."""
lines = [
('cn', result['cn']),
('description', result.get('description', 'Unknown')),
('gid_number', str(result['gid_number'])),
]
for i, member in enumerate(result['members']):
if i == 0:
prefix = 'members'
else:
prefix = ''
lines.append((prefix, member['cn'] + ' (' + member['uid'] + ')'))
print_colon_kv(lines)
@groups.command(short_help='Get info about a group')
@click.argument('group_name')
def get(group_name):
resp = http_get('/api/groups/' + group_name)
result = handle_sync_response(resp)
print_group_lines(result)
@groups.command(short_help='Add a member to a group')
@click.argument('group_name')
@click.argument('username')
@click.option('--no-subscribe', is_flag=True, default=False,
help='Do not subscribe the member to any auxiliary mailing lists.')
def addmember(group_name, username, no_subscribe):
click.confirm(f'Are you sure you want to add {username} to {group_name}?',
abort=True)
base_domain = component.getUtility(IConfig).get('base_domain')
url = f'/api/groups/{group_name}/members/{username}'
operations = AddMemberToGroupTransaction.operations
if no_subscribe:
url += '?subscribe_to_lists=false'
operations.remove('subscribe_user_to_auxiliary_mailing_lists')
resp = http_post(url)
data = handle_stream_response(resp, operations)
result = data[-1]['result']
lines = []
for i, group in enumerate(result['added_to_groups']):
if i == 0:
prefix = 'Added to groups'
else:
prefix = ''
lines.append((prefix, group))
for i, mailing_list in enumerate(result.get('subscribed_to_lists', [])):
if i == 0:
prefix = 'Subscribed to lists'
else:
prefix = ''
if '@' not in mailing_list:
mailing_list += '@' + base_domain
lines.append((prefix, mailing_list))
print_colon_kv(lines)
@groups.command(short_help='Remove a member from a group')
@click.argument('group_name')
@click.argument('username')
@click.option('--no-unsubscribe', is_flag=True, default=False,
help='Do not unsubscribe the member from any auxiliary mailing lists.')
def removemember(group_name, username, no_unsubscribe):
click.confirm(f'Are you sure you want to remove {username} from {group_name}?',
abort=True)
base_domain = component.getUtility(IConfig).get('base_domain')
url = f'/api/groups/{group_name}/members/{username}'
operations = RemoveMemberFromGroupTransaction.operations
if no_unsubscribe:
url += '?unsubscribe_from_lists=false'
operations.remove('unsubscribe_user_from_auxiliary_mailing_lists')
resp = http_delete(url)
data = handle_stream_response(resp, operations)
result = data[-1]['result']
lines = []
for i, group in enumerate(result['removed_from_groups']):
if i == 0:
prefix = 'Removed from groups'
else:
prefix = ''
lines.append((prefix, group))
for i, mailing_list in enumerate(result.get('unsubscribed_from_lists', [])):
if i == 0:
prefix = 'Unsubscribed from lists'
else:
prefix = ''
if '@' not in mailing_list:
mailing_list += '@' + base_domain
lines.append((prefix, mailing_list))
print_colon_kv(lines)
@groups.command(short_help='Delete a group')
@click.argument('group_name')
def delete(group_name):
check_if_in_development()
click.confirm(f"Are you sure you want to delete {group_name}?", abort=True)
resp = http_delete(f'/api/groups/{group_name}')
handle_stream_response(resp, DeleteGroupTransaction.operations)

View File

@ -1,4 +1,3 @@
import socket
import sys
from typing import Dict
@ -6,7 +5,8 @@ import click
from zope import component
from ..utils import http_post, http_get, http_patch, http_delete, get_failed_operations
from .utils import handle_stream_response, handle_sync_response, print_colon_kv
from .utils import handle_stream_response, handle_sync_response, print_colon_kv, \
check_if_in_development
from ceo_common.interfaces import IConfig
from ceo_common.model import Term
from ceod.transactions.members import (
@ -209,10 +209,7 @@ def pwreset(username):
@members.command(short_help="Delete a user")
@click.argument('username')
def delete(username):
# a hack to determine if we're in the dev environment
if not socket.getfqdn().endswith('.csclub.internal'):
click.echo('This command may only be called during development.')
sys.exit(1)
check_if_in_development()
click.confirm(f"Are you sure you want to delete {username}?", abort=True)
resp = http_delete(f'/api/members/{username}')
handle_stream_response(resp, DeleteMemberTransaction.operations)

View File

@ -1,4 +1,5 @@
import json
import socket
import sys
from typing import List, Tuple, Dict
@ -29,7 +30,11 @@ def print_colon_kv(pairs: List[Tuple[str, str]]):
"""
maxlen = max(len(key) for key, val in pairs)
for key, val in pairs:
click.echo(key + ': ', nl=False)
if key != '':
click.echo(key + ': ', nl=False)
else:
# assume this is a continuation from the previous line
click.echo(' ', nl=False)
extra_space = ' ' * (maxlen - len(key))
click.echo(extra_space, nl=False)
click.echo(val)
@ -58,6 +63,8 @@ def handle_stream_response(resp: requests.Response, operations: List[str]) -> Li
click.echo('Please check the ceod logs.')
sys.exit(1)
elif d['status'] == 'completed':
if idx < len(operations):
click.echo('Skipped')
click.echo('Transaction successfully completed.')
return data
@ -105,3 +112,10 @@ def handle_sync_response(resp: requests.Response):
click.echo(resp.text.rstrip())
raise Abort()
return resp.json()
def check_if_in_development() -> bool:
"""Aborts if we are not currently in the dev environment."""
if not socket.getfqdn().endswith('.csclub.internal'):
click.echo('This command may only be called during development.')
raise Abort()

View File

@ -16,4 +16,12 @@ descriptions = {
'remove_user_from_kerberos': 'Remove user from Kerberos',
'delete_home_dir': 'Delete home directory',
'unsubscribe_from_mailing_list': 'Unsubscribe from mailing list',
'add_sudo_role': 'Add sudo role to LDAP',
'add_user_to_group': 'Add user to group',
'add_user_to_auxiliary_groups': 'Add user to auxiliary groups',
'subscribe_user_to_auxiliary_mailing_lists': 'Subscribe user to auxiliary mailing lists',
'remove_user_from_group': 'Remove user from group',
'remove_user_from_auxiliary_groups': 'Remove user from auxiliary groups',
'unsubscribe_user_from_auxiliary_mailing_lists': 'Unsubscribe user from auxiliary mailing lists',
'remove_sudo_role': 'Remove sudo role from LDAP',
}

View File

@ -0,0 +1,154 @@
import re
from click.testing import CliRunner
from ceo.cli import cli
def test_groups(cli_setup, ldap_user):
runner = CliRunner()
result = runner.invoke(cli, [
'groups', 'add', 'test_group_1', '-d', 'Test Group One',
], input='y\n')
expected_pat = re.compile((
"^The following group will be created:\n"
"cn: test_group_1\n"
"description: Test Group One\n"
"Do you want to continue\\? \\[y/N\\]: y\n"
"Add user to LDAP... Done\n"
"Add group to LDAP... Done\n"
"Add sudo role to LDAP... Done\n"
"Create home directory... Done\n"
"Transaction successfully completed.\n"
"cn: test_group_1\n"
"description: Test Group One\n"
"gid_number: \\d{5}\n$"
), re.MULTILINE)
assert result.exit_code == 0
assert expected_pat.match(result.output) is not None
runner = CliRunner()
result = runner.invoke(cli, ['groups', 'get', 'test_group_1'])
expected_pat = re.compile((
"^cn: test_group_1\n"
"description: Test Group One\n"
"gid_number: \\d{5}\n$"
), re.MULTILINE)
assert result.exit_code == 0
assert expected_pat.match(result.output) is not None
runner = CliRunner()
result = runner.invoke(cli, [
'groups', 'addmember', 'test_group_1', ldap_user.uid,
], input='y\n')
expected = (
f"Are you sure you want to add {ldap_user.uid} to test_group_1? [y/N]: y\n"
"Add user to group... Done\n"
"Add user to auxiliary groups... Skipped\n"
"Transaction successfully completed.\n"
"Added to groups: test_group_1\n"
)
assert result.exit_code == 0
assert result.output == expected
runner = CliRunner()
result = runner.invoke(cli, ['groups', 'get', 'test_group_1'])
expected_pat = re.compile(f"members:\\s+{ldap_user.cn} \\({ldap_user.uid}\\)")
assert result.exit_code == 0
assert expected_pat.search(result.output) is not None
runner = CliRunner()
result = runner.invoke(cli, [
'groups', 'removemember', 'test_group_1', ldap_user.uid,
], input='y\n')
expected = (
f"Are you sure you want to remove {ldap_user.uid} from test_group_1? [y/N]: y\n"
"Remove user from group... Done\n"
"Remove user from auxiliary groups... Skipped\n"
"Transaction successfully completed.\n"
"Removed from groups: test_group_1\n"
)
assert result.exit_code == 0
assert result.output == expected
runner = CliRunner()
result = runner.invoke(cli, ['groups', 'delete', 'test_group_1'], input='y\n')
assert result.exit_code == 0
def create_group(group_name, desc):
runner = CliRunner()
result = runner.invoke(cli, [
'groups', 'add', group_name, '-d', desc,
], input='y\n')
assert result.exit_code == 0
def delete_group(group_name):
runner = CliRunner()
result = runner.invoke(cli, ['groups', 'delete', group_name], input='y\n')
assert result.exit_code == 0
def test_groups_with_auxiliary_groups_and_mailing_lists(cli_setup, ldap_user):
runner = CliRunner()
# make sure auxiliary groups + mailing lists exist in ceod_test_local.ini
create_group('syscom', 'Systems Committee')
create_group('office', 'Office')
create_group('staff', 'Staff')
runner = CliRunner()
result = runner.invoke(cli, [
'groups', 'addmember', 'syscom', ldap_user.uid,
], input='y\n')
expected = (
f"Are you sure you want to add {ldap_user.uid} to syscom? [y/N]: y\n"
"Add user to group... Done\n"
"Add user to auxiliary groups... Done\n"
"Subscribe user to auxiliary mailing lists... Done\n"
"Transaction successfully completed.\n"
"Added to groups: syscom\n"
" office\n"
" staff\n"
"Subscribed to lists: syscom@csclub.internal\n"
" syscom-alerts@csclub.internal\n"
)
assert result.exit_code == 0
assert result.output == expected
runner = CliRunner()
result = runner.invoke(cli, [
'groups', 'removemember', 'syscom', ldap_user.uid,
], input='y\n')
expected = (
f"Are you sure you want to remove {ldap_user.uid} from syscom? [y/N]: y\n"
"Remove user from group... Done\n"
"Remove user from auxiliary groups... Done\n"
"Unsubscribe user from auxiliary mailing lists... Done\n"
"Transaction successfully completed.\n"
"Removed from groups: syscom\n"
" office\n"
" staff\n"
"Unsubscribed from lists: syscom@csclub.internal\n"
" syscom-alerts@csclub.internal\n"
)
assert result.exit_code == 0
assert result.output == expected
runner = CliRunner()
result = runner.invoke(cli, [
'groups', 'addmember', 'syscom', ldap_user.uid, '--no-subscribe',
], input='y\n')
assert result.exit_code == 0
assert 'Subscribed to lists' not in result.output
runner = CliRunner()
result = runner.invoke(cli, [
'groups', 'removemember', 'syscom', ldap_user.uid, '--no-unsubscribe',
], input='y\n')
assert result.exit_code == 0
assert 'Unsubscribed from lists' not in result.output
delete_group('syscom')
delete_group('office')
delete_group('staff')