@ -0,0 +1,44 @@ |
||||
import click |
||||
from zope import component |
||||
|
||||
from ..utils import http_get, http_post |
||||
from .utils import handle_sync_response, handle_stream_response, print_colon_kv |
||||
from ceo_common.interfaces import IConfig |
||||
from ceod.transactions.members import UpdateMemberPositionsTransaction |
||||
|
||||
|
||||
@click.group(short_help='List or change exec positions') |
||||
def positions(): |
||||
update_commands() |
||||
|
||||
|
||||
@positions.command(short_help='Get current positions') |
||||
def get(): |
||||
resp = http_get('/api/positions') |
||||
result = handle_sync_response(resp) |
||||
print_colon_kv(result.items()) |
||||
|
||||
|
||||
@positions.command(short_help='Update positions') |
||||
def set(**kwargs): |
||||
body = {k.replace('_', '-'): v for k, v in kwargs.items()} |
||||
print_body = {k: v or '' for k, v in body.items()} |
||||
click.echo('The positions will be updated:') |
||||
print_colon_kv(print_body.items()) |
||||
click.confirm('Do you want to continue?', abort=True) |
||||
|
||||
resp = http_post('/api/positions', json=body) |
||||
handle_stream_response(resp, UpdateMemberPositionsTransaction.operations) |
||||
|
||||
|
||||
# Provides dynamic parameters for `set' command using config file |
||||
def update_commands(): |
||||
global set |
||||
|
||||
cfg = component.getUtility(IConfig) |
||||
avail = cfg.get('positions_available') |
||||
required = cfg.get('positions_required') |
||||
|
||||
for pos in avail: |
||||
|
||||
r = pos in required |
||||
set = click.option(f'--{pos}', metavar='USERNAME', required=r, prompt=r)(set) |
@ -0,0 +1,55 @@ |
||||
from click.testing import CliRunner |
||||
from ceo.cli import cli |
||||
|
||||
|
||||
def test_positions(cli_setup): |
||||
runner = CliRunner() |
||||
|
||||
# Setup test data |
||||
for i in range(5): |
||||
runner.invoke(cli, ['members', 'add', f'test_{i}', '--cn', f'Test {i}', '--program', 'Math', '--terms', '1'], input='y\n') |
||||
runner.invoke(cli, ['groups', 'add', 'exec', '--description', 'Test Group'], input='y\n') |
||||
|
||||
result = runner.invoke(cli, [ |
||||
'positions', 'set', |
||||
'--president', 'test_0', |
||||
'--vice-president', 'test_1', |
||||
'--sysadmin', 'test_2', |
||||
'--secretary', 'test_3', |
||||
'--webmaster', 'test_4', |
||||
], input='y\n') |
||||
|
||||
assert result.exit_code == 0 |
||||
assert result.output == ''' |
||||
The positions will be updated: |
||||
president: test_0 |
||||
vice-president: test_1 |
||||
sysadmin: test_2 |
||||
secretary: test_3 |
||||
webmaster: test_4 |
||||
treasurer: |
||||
cro: |
||||
librarian: |
||||
imapd: |
||||
offsck: |
||||
Do you want to continue? [y/N]: y |
||||
Update positions in LDAP... Done |
||||
Update executive group in LDAP... Done |
||||
Subscribe to mailing lists... Done |
||||
Transaction successfully completed. |
||||
'''[1:] # noqa: W291 |
||||
|
||||
result = runner.invoke(cli, ['positions', 'get']) |
||||
assert result.exit_code == 0 |
||||
assert result.output == ''' |
||||
president: test_0 |
||||
secretary: test_3 |
||||
sysadmin: test_2 |
||||
vice-president: test_1 |
||||
webmaster: test_4 |
||||
'''[1:] |
||||
|
||||
# Cleanup test data |
||||
for i in range(5): |
||||
runner.invoke(cli, ['members', 'delete', f'test_{i}'], input='y\n') |
||||
runner.invoke(cli, ['groups', 'delete', 'exec'], input='y\n') |
Loading…
Reference in new issue
What is this doing?
zope's
registerUtility
emits an IUtilityRegistration containing the component that got registered. Here I'm just checking for that, and whether the registered component is an IConfig.The reason why this whole thing is here is to make the parameters (--president, --syscom, etc) follow what is in the config.