Merge branch 'v1' of https://git.csclub.uwaterloo.ca/public/pyceo into db-cli
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Andrew Wang 2021-09-08 20:44:25 -04:00
commit 300688cb9a
39 changed files with 1267 additions and 220 deletions

View File

@ -13,6 +13,9 @@ from ceo_common.model import Config, HTTPClient
def register_services():
# Using base component directly so events get triggered
baseComponent = component.getGlobalSiteManager()
# Config
# This is a hack to determine if we're in the dev env or not
if socket.getfqdn().endswith('.csclub.internal'):
@ -21,11 +24,11 @@ def register_services():
else:
config_file = os.environ.get('CEO_CONFIG', '/etc/csc/ceo.ini')
cfg = Config(config_file)
component.provideUtility(cfg, IConfig)
baseComponent.registerUtility(cfg, IConfig)
# HTTPService
http_client = HTTPClient()
component.provideUtility(http_client, IHTTPClient)
baseComponent.registerUtility(http_client, IHTTPClient)
def main():

View File

@ -1,3 +1,4 @@
import sys
from typing import List, Union
import click
@ -20,6 +21,7 @@ class Abort(click.ClickException):
class CLIStreamResponseHandler(StreamResponseHandler):
def __init__(self, operations: List[str]):
super().__init__()
self.operations = operations
self.idx = 0
@ -36,6 +38,7 @@ class CLIStreamResponseHandler(StreamResponseHandler):
click.echo('The transaction was rolled back.')
click.echo('The error was: ' + err_msg)
click.echo('Please check the ceod logs.')
sys.exit(1)
def handle_completed(self):
click.echo('Transaction successfully completed.')

View File

@ -2,6 +2,7 @@ import click
from .members import members
from .groups import groups
from .positions import positions
from .updateprograms import updateprograms
from .mysql import mysql
from .postgresql import postgresql
@ -14,6 +15,7 @@ def cli():
cli.add_command(members)
cli.add_command(groups)
cli.add_command(positions)
cli.add_command(updateprograms)
cli.add_command(mysql)
cli.add_command(postgresql)

View File

@ -4,13 +4,12 @@ from typing import Dict
import click
from zope import component
from ..term_utils import get_terms_for_new_user, get_terms_for_renewal
from ..utils import http_post, http_get, http_patch, http_delete, \
get_failed_operations, get_terms_for_new_user, user_dict_lines, \
get_adduser_operations
get_failed_operations, user_dict_lines, get_adduser_operations
from .utils import handle_stream_response, handle_sync_response, print_lines, \
check_if_in_development
from ceo_common.interfaces import IConfig
from ceo_common.model import Term
from ceod.transactions.members import DeleteMemberTransaction
@ -24,7 +23,7 @@ def members():
@click.option('--cn', help='Full name', prompt='Full name')
@click.option('--program', required=False, help='Academic program')
@click.option('--terms', 'num_terms', type=click.IntRange(1, 100),
help='Number of terms to add', prompt='Number of terms')
help='Number of terms to add', default=1)
@click.option('--clubrep', is_flag=True, default=False,
help='Add non-member terms instead of member terms')
@click.option('--forwarding-address', required=False,
@ -133,22 +132,7 @@ def modify(username, login_shell, forwarding_addresses):
@click.option('--clubrep', is_flag=True, default=False,
help='Add non-member terms instead of member terms')
def renew(username, num_terms, clubrep):
resp = http_get('/api/members/' + username)
result = handle_sync_response(resp)
max_term = None
current_term = Term.current()
if clubrep and 'non_member_terms' in result:
max_term = max(Term(s) for s in result['non_member_terms'])
elif not clubrep and 'terms' in result:
max_term = max(Term(s) for s in result['terms'])
if max_term is not None and max_term >= current_term:
next_term = max_term + 1
else:
next_term = Term.current()
terms = [next_term + i for i in range(num_terms)]
terms = list(map(str, terms))
terms = get_terms_for_renewal(username, num_terms, clubrep)
if clubrep:
body = {'non_member_terms': terms}

44
ceo/cli/positions.py Normal file
View File

@ -0,0 +1,44 @@
import click
from zope import component
from ..utils import http_get, http_post
from .utils import handle_sync_response, handle_stream_response, print_colon_kv
from ceo_common.interfaces import IConfig
from ceod.transactions.members import UpdateMemberPositionsTransaction
@click.group(short_help='List or change exec positions')
def positions():
update_commands()
@positions.command(short_help='Get current positions')
def get():
resp = http_get('/api/positions')
result = handle_sync_response(resp)
print_colon_kv(result.items())
@positions.command(short_help='Update positions')
def set(**kwargs):
body = {k.replace('_', '-'): v for k, v in kwargs.items()}
print_body = {k: v or '' for k, v in body.items()}
click.echo('The positions will be updated:')
print_colon_kv(print_body.items())
click.confirm('Do you want to continue?', abort=True)
resp = http_post('/api/positions', json=body)
handle_stream_response(resp, UpdateMemberPositionsTransaction.operations)
# Provides dynamic parameters for `set' command using config file
def update_commands():
global set
cfg = component.getUtility(IConfig)
avail = cfg.get('positions_available')
required = cfg.get('positions_required')
for pos in avail:
r = pos in required
set = click.option(f'--{pos}', metavar='USERNAME', required=r, prompt=r)(set)

View File

@ -24,4 +24,7 @@ descriptions = {
'remove_user_from_auxiliary_groups': 'Remove user from auxiliary groups',
'unsubscribe_user_from_auxiliary_mailing_lists': 'Unsubscribe user from auxiliary mailing lists',
'remove_sudo_role': 'Remove sudo role from LDAP',
'update_positions_ldap': 'Update positions in LDAP',
'update_exec_group_ldap': 'Update executive group in LDAP',
'subscribe_to_mailing_lists': 'Subscribe to mailing lists',
}

38
ceo/term_utils.py Normal file
View File

@ -0,0 +1,38 @@
from typing import List
from .utils import http_get
from ceo_common.model import Term
import ceo.cli.utils as cli_utils
import ceo.tui.utils as tui_utils
# Had to put these in a separate file to avoid a circular import.
def get_terms_for_new_user(num_terms: int) -> List[str]:
current_term = Term.current()
terms = [current_term + i for i in range(num_terms)]
return list(map(str, terms))
def get_terms_for_renewal(
username: str, num_terms: int, clubrep: bool, tui_model=None,
) -> List[str]:
resp = http_get('/api/members/' + username)
if tui_model is None:
result = cli_utils.handle_sync_response(resp)
else:
result = tui_utils.handle_sync_response(resp, tui_model)
max_term = None
current_term = Term.current()
if clubrep and 'non_member_terms' in result:
max_term = max(Term(s) for s in result['non_member_terms'])
elif not clubrep and 'terms' in result:
max_term = max(Term(s) for s in result['terms'])
if max_term is not None and max_term >= current_term:
next_term = max_term + 1
else:
next_term = Term.current()
terms = [next_term + i for i in range(num_terms)]
return list(map(str, terms))

168
ceo/tui/CeoFrame.py Normal file
View File

@ -0,0 +1,168 @@
from asciimatics.event import KeyboardEvent
from asciimatics.exceptions import NextScene, StopApplication
from asciimatics.screen import Screen
from asciimatics.widgets import Frame, Layout, Divider, Button, Label, \
PopUpDialog
class CeoFrame(Frame):
def __init__(
self,
screen,
height,
width,
model,
name, # key in model.viewdata
on_load=None,
title=None,
save_data=False, # whether to save widget state for resizing
has_dynamic_layouts=False, # whether layouts are created on load
escape_on_q=False, # whether to quit when 'q' is pressed
):
super().__init__(
screen,
height,
width,
name=name,
can_scroll=False,
title=title,
on_load=self._ceoframe_on_load,
)
self._save_data = save_data
self._extra_on_load = on_load
self._model = model
self._name = name
self._loaded = False
self._has_dynamic_layouts = has_dynamic_layouts
self._quit_keys = [Screen.KEY_ESCAPE]
if escape_on_q:
self._quit_keys.append(ord('q'))
# sanity check
if save_data:
assert name in model.viewdata
def _ceoframe_on_load(self):
# We usually don't want _on_load() to be called multiple times
# e.g. when switching back to a scene, or after calling reset()
if self._loaded:
return
self._loaded = True
if self._model.title is not None:
self.title = self._model.title
self._model.title = None
if self._save_data:
# restore the saved input fields' values
self.data = self._model.viewdata[self._name]
if self._extra_on_load is not None:
self._extra_on_load()
def _ceoframe_on_unload(self):
"""
This should be called just after the screen gets resized,
but before the new scenes are constructed.
The idea is to save the user's data in the input fields
so that we can restore them in the new scenes.
"""
if not self._save_data:
return
self.save()
self._model.viewdata[self._name] = self.data
def _ceoframe_on_reset(self):
"""
This needs to be called whenever we return to the home screen
after some kind of operation was completed.
Currently this is called from Model.reset().
"""
# We want a fresh slate once we return to the home screen, so we
# want on_load() to be called for the scenes.
self._loaded = False
if self._has_dynamic_layouts:
# We don't want layouts to accumulate.
self.clear_layouts()
def clear_layouts(self):
# OK so this a *really* bad thing to do, since we're reaching
# into the private variables of a third-party library.
# Unfortunately asciimatics doesn't allow us to clear the layouts
# of an existing frame, and we need this to be able to re-use
# frames which create layouts dynamically.
self._layouts.clear()
def add_buttons(
self, back_btn=False, back_btn_text='Back',
next_scene=None, next_btn_text='Next', on_next=None,
on_next_excl=None,
):
"""
Add a new layout at the bottom of the frame with buttons.
If back_btn is True, a Back button is added.
If next_scene is set to the name of the next scene, or on_next_excl
is set, a Next button will be added.
If on_next is set to a function, it will be called when the Next
button is pressed, and the screen will switch to the next scene.
If on_next_excl is set to a function, it will be called when the Next
button is pressed, and the scene will not be switched.
If both on_next and on_next_excl are set, on_next will be ignored.
"""
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Divider())
def _back():
raise NextScene(self._model.scene_stack.pop())
def _next():
if on_next_excl is not None:
on_next_excl()
return
if on_next is not None:
on_next()
self.go_to_next_scene(next_scene)
layout = Layout([1, 1])
self.add_layout(layout)
if back_btn:
layout.add_widget(Button(back_btn_text, _back), 0)
if next_scene is not None or on_next_excl is not None:
layout.add_widget(Button(next_btn_text, _next), 1)
def go_to_next_scene(self, next_scene: str):
self._model.scene_stack.append(self._name)
raise NextScene(next_scene)
def add_flash_message_layout(self):
layout = Layout([100])
self.add_layout(layout)
self._status_label = Label('')
layout.add_widget(self._status_label)
def flash_message(self, msg: str, force_update: bool = False):
self._status_label.text = msg
if force_update:
self._model.screen.force_update()
self._model.screen.draw_next_frame()
def clear_flash_message(self):
self.flash_message('')
def process_event(self, event):
if not isinstance(event, KeyboardEvent):
return super().process_event(event)
c = event.key_code
# Stop on 'q' or 'Esc'
if c in self._quit_keys:
self._scene.add_effect(PopUpDialog(
self.screen,
'Are you sure you want to quit?',
['Yes', 'No'],
has_shadow=True,
on_close=self._quit_on_yes,
))
return super().process_event(event)
@staticmethod
def _quit_on_yes(selected):
# Yes is the first button
if selected == 0:
raise StopApplication("User terminated app")

View File

@ -1,28 +1,16 @@
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Frame, Layout, Button, Divider, Label
from asciimatics.widgets import Layout, Label
from .CeoFrame import CeoFrame
class ConfirmView(Frame):
class ConfirmView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen,
height,
width,
can_scroll=False,
on_load=self._on_load,
title='Confirmation',
screen, height, width, model, 'Confirm',
on_load=self._confirmview_on_load, title='Confirmation',
has_dynamic_layouts=True,
escape_on_q=True,
)
self._model = model
def _add_buttons(self):
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Divider())
layout = Layout([1, 1])
self.add_layout(layout)
layout.add_widget(Button('No', self._back), 0)
layout.add_widget(Button('Yes', self._next), 1)
def _add_line(self, text: str = ''):
layout = Layout([100])
@ -35,7 +23,7 @@ class ConfirmView(Frame):
layout.add_widget(Label(key + ':', align='>'), 0)
layout.add_widget(Label(val, align='<'), 2)
def _on_load(self):
def _confirmview_on_load(self):
for _ in range(2):
self._add_line()
for line in self._model.confirm_lines:
@ -48,12 +36,22 @@ class ConfirmView(Frame):
# fill the rest of the space
self.add_layout(Layout([100], fill_frame=True))
self._add_buttons()
kwargs = {
'back_btn': True, 'back_btn_text': 'No', 'next_btn_text': 'Yes',
}
if self._model.operations is not None:
kwargs['next_scene'] = 'Transaction'
else:
self.add_flash_message_layout()
kwargs['on_next_excl'] = self._next
self.add_buttons(**kwargs)
self.fix()
def _back(self):
raise NextScene(self._model.scene_stack.pop())
def _next(self):
self._model.scene_stack.append('Confirm')
raise NextScene('Transaction')
self.flash_message('Sending request...', force_update=True)
try:
self._model.resp = self._model.deferred_req()
finally:
self.clear_flash_message()
next_scene = self._model.result_view_name or 'Result'
self.go_to_next_scene(next_scene)

30
ceo/tui/ErrorView.py Normal file
View File

@ -0,0 +1,30 @@
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Layout, Label
from .CeoFrame import CeoFrame
class ErrorView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Error',
on_load=self._errorview_on_load, title='Error',
has_dynamic_layouts=True,
)
def _errorview_on_load(self):
layout = Layout([1, 10], fill_frame=True)
self.add_layout(layout)
for _ in range(2):
layout.add_widget(Label(''), 1)
layout.add_widget(Label('An error occurred:'), 1)
layout.add_widget(Label(''), 1)
for line in self._model.error_message.splitlines():
layout.add_widget(Label(line), 1)
self.add_buttons(on_next_excl=self._next)
self.fix()
def _next(self):
self._model.reset()
raise NextScene('Welcome')

View File

@ -1,12 +1,86 @@
from copy import deepcopy
class Model:
"""A convenient place to share data beween views."""
"""A convenient place to store View data persistently."""
def __init__(self):
# simple key-value pairs
self.screen = None
self.views = []
self.title = None
self.for_member = True
self.scene_stack = []
self.result_view_name = None
self.error_message = None
# view-specific data, to be used when e.g. resizing the window
self._initial_viewdata = {
'AddUser': {
'uid': '',
'cn': '',
'program': '',
'forwarding_address': '',
'num_terms': '1',
},
'RenewUser': {
'uid': '',
'num_terms': '1',
},
'Transaction': {
'op_layout': None,
'msg_layout': None,
'labels': {},
'status': 'not started',
},
'GetUser': {
'uid': '',
},
'ResetPassword': {
'uid': '',
},
'ChangeLoginShell': {
'uid': '',
'login_shell': '',
},
'SetForwardingAddresses': {
'uid': '',
'forwarding_addresses': [''],
},
'AddGroup': {
'cn': '',
'description': '',
},
'GetGroup': {
'cn': '',
},
'AddMemberToGroup': {
'cn': '',
'uid': '',
'subscribe': True,
},
'RemoveMemberFromGroup': {
'cn': '',
'uid': '',
'unsubscribe': True,
},
}
self.viewdata = deepcopy(self._initial_viewdata)
# data which is shared between multiple views
self.is_club_rep = False
self.confirm_lines = None
self.operations = None
self.deferred_req = None
self.resp = None
def reset(self):
self.viewdata = deepcopy(self._initial_viewdata)
self.is_club_rep = False
self.confirm_lines = None
self.operations = None
self.deferred_req = None
self.resp = None
self.title = None
self.error_message = None
self.scene_stack.clear()
self.result_view_name = None
for view in self.views:
if hasattr(view, '_ceoframe_on_reset'):
view._ceoframe_on_reset()

63
ceo/tui/ResultView.py Normal file
View File

@ -0,0 +1,63 @@
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Layout, Label
import requests
from .CeoFrame import CeoFrame
class ResultView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Result',
on_load=self._resultview_on_load, title='Result',
has_dynamic_layouts=True,
escape_on_q=True,
)
# TODO: deduplicate this from ConfirmView
def _add_text(self, text: str = '\n', center: bool = False):
if center:
layout = Layout([100])
align = '^'
col = 0
else:
layout = Layout([1, 10])
align = '<'
col = 1
self.add_layout(layout)
for line in text.splitlines():
layout.add_widget(Label(line, align=align), col)
def _add_pair(self, key: str, val: str):
layout = Layout([10, 1, 10])
self.add_layout(layout)
if key:
layout.add_widget(Label(key + ':', align='>'), 0)
else:
layout.add_widget(Label(''), 0)
layout.add_widget(Label(val, align='<'), 2)
# override this method in child classes if desired
def show_result(self, resp: requests.Response):
self._add_text('The operation was successfully performed.', center=True)
def _resultview_on_load(self):
self._add_text()
resp = self._model.resp
if resp.status_code != 200:
self._add_text('An error occurred:')
if resp.headers.get('content-type') == 'application/json':
err_msg = resp.json()['error']
else:
err_msg = resp.text.rstrip()
self._add_text(err_msg)
else:
self.show_result(resp)
# fill the rest of the space
self.add_layout(Layout([100], fill_frame=True))
self.add_buttons(on_next_excl=self._next)
self.fix()
def _next(self):
self._model.reset()
raise NextScene('Welcome')

View File

@ -1,6 +1,6 @@
from typing import Dict, Union
from asciimatics.widgets import Label, Button, Layout, Frame
from asciimatics.widgets import Label, Layout
import requests
from .Model import Model
@ -12,43 +12,42 @@ class TUIStreamResponseHandler(StreamResponseHandler):
self,
model: Model,
labels: Dict[str, Label],
next_btn: Button,
msg_layout: Layout,
frame: Frame,
txn_view, # TransactionView
):
super().__init__()
self.screen = model.screen
self.operations = model.operations
self.idx = 0
self.labels = labels
self.next_btn = next_btn
self.msg_layout = msg_layout
self.frame = frame
self.txn_view = txn_view
self.error_messages = []
def _update(self):
# Since we're running in a separate thread, we need to force the
# screen to update. See
# https://github.com/peterbrittain/asciimatics/issues/56
self.frame.fix()
self.txn_view.fix()
self.screen.force_update()
def _enable_next_btn(self):
self.next_btn.disabled = False
self.frame.reset()
def _show_msg(self, msg: str = ''):
def _show_msg(self, msg: str = '\n'):
for line in msg.splitlines():
self.msg_layout.add_widget(Label(line, align='^'))
def _abort(self):
for operation in self.operations[self.idx:]:
self.labels[operation].text = 'ABORTED'
self._enable_next_btn()
self.txn_view.enable_next_btn()
def handle_non_200(self, resp: requests.Response):
self._abort()
self._show_msg('An error occurred:')
self._show_msg(resp.text)
if resp.headers.get('content-type') == 'application/json':
err_msg = resp.json()['error']
else:
err_msg = resp.text
self._show_msg(err_msg)
self._update()
def begin(self):
@ -57,19 +56,19 @@ class TUIStreamResponseHandler(StreamResponseHandler):
def handle_aborted(self, err_msg: str):
self._abort()
self._show_msg('The transaction was rolled back.')
self._show_msg('The error was:')
self._show_msg('The error was:\n')
self._show_msg(err_msg)
self._show_msg()
self._show_msg('Please check the ceod logs.')
self._update()
def handle_completed(self):
self._show_msg('Transaction successfully completed.')
if len(self.error_messages) > 0:
self._show_msg('There were some errors, please check the '
'ceod logs.')
# we don't have enough space in the TUI to actually
# show the error messages
self._enable_next_btn()
self._show_msg('There were some errors:')
for msg in self.error_messages:
self._show_msg(msg)
self.txn_view.enable_next_btn()
self._update()
def handle_successful_operation(self):

View File

@ -1,29 +1,24 @@
from threading import Thread
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Frame, Layout, Button, Divider, Label
from asciimatics.widgets import Layout, Button, Divider, Label
from ..operation_strings import descriptions as op_desc
from ..utils import generic_handle_stream_response
from .CeoFrame import CeoFrame
from .TUIStreamResponseHandler import TUIStreamResponseHandler
class TransactionView(Frame):
class TransactionView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen,
height,
width,
can_scroll=False,
on_load=self._on_load,
title='Running Transaction',
screen, height, width, model, 'Transaction',
on_load=self._txnview_on_load, title='Running Transaction',
has_dynamic_layouts=True,
)
self._model = model
# map operation names to label widgets
self._labels = {}
# this is an ugly hack to get around the fact that _on_load()
# will be called again when we reset() in the TUIStreamResponseHandler
self._loaded = False
self._labels = model.viewdata['Transaction']['labels']
def _add_buttons(self):
layout = Layout([100])
@ -33,49 +28,75 @@ class TransactionView(Frame):
layout = Layout([1, 1])
self.add_layout(layout)
self._next_btn = Button('Next', self._next)
# we don't want to disable the button if the txn completed
# and the user just resized the window
if self._model.viewdata['Transaction']['status'] != 'completed':
self._next_btn.disabled = True
layout.add_widget(self._next_btn, 1)
def _add_line(self, text: str = ''):
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Label(text, align='^'))
def _on_load(self):
if self._loaded:
return
self._loaded = True
def _add_blank_line(self):
self._op_layout.add_widget(Label(''), 0)
self._op_layout.add_widget(Label(''), 2)
def _txnview_on_load(self):
d = self._model.viewdata['Transaction']
if d['op_layout'] is None:
first_time = True
self._op_layout = Layout([12, 1, 10])
self.add_layout(self._op_layout)
# store the layouts so that we can re-use them when the screen
# gets resized
d['op_layout'] = self._op_layout
for _ in range(2):
self._add_line()
self._add_blank_line()
for operation in self._model.operations:
desc = op_desc[operation]
layout = Layout([10, 1, 10])
self.add_layout(layout)
layout.add_widget(Label(desc + '...', align='>'), 0)
self._op_layout.add_widget(Label(desc + '...', align='>'), 0)
desc_label = Label('', align='<')
layout.add_widget(desc_label, 2)
self._op_layout.add_widget(desc_label, 2)
self._labels[operation] = desc_label
self._add_line()
self._add_blank_line()
# this is the where success/failure messages etc. get placed
self._msg_layout = Layout([100])
self.add_layout(self._msg_layout)
d['msg_layout'] = self._msg_layout
else:
# we arrive here when the screen has been resized
first_time = False
# restore the layouts which we saved
self._op_layout = d['op_layout']
self.add_layout(self._op_layout)
self._msg_layout = d['msg_layout']
self.add_layout(self._msg_layout)
# fill up the rest of the space
self.add_layout(Layout([100], fill_frame=True))
self._add_buttons()
self.fix()
# only send the API request the first time we arrive at this
# scene, not when the screen gets resized
if first_time:
Thread(target=self._do_txn).start()
def _do_txn(self):
self._model.viewdata['Transaction']['status'] = 'in progress'
resp = self._model.deferred_req()
handler = TUIStreamResponseHandler(
model=self._model,
labels=self._labels,
next_btn=self._next_btn,
msg_layout=self._msg_layout,
frame=self,
txn_view=self,
)
generic_handle_stream_response(resp, self._model.operations, handler)
def enable_next_btn(self):
self._next_btn.disabled = False
# If we don't reset, the button isn't selectable, even though we
# enabled it
self.reset()
# save the fact that the transaction is completed
self._model.viewdata['Transaction']['status'] = 'completed'
def _next(self):
self._model.scene_stack.clear()
self._model.reset()
raise NextScene('Welcome')

View File

@ -1,40 +1,66 @@
from asciimatics.widgets import Frame, ListBox, Layout, Divider, \
Button, Widget
import functools
from asciimatics.widgets import ListBox, Layout, Divider, Button, Label
from asciimatics.exceptions import NextScene, StopApplication
from .CeoFrame import CeoFrame
class WelcomeView(Frame):
class WelcomeView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen,
height,
width,
can_scroll=False,
screen, height, width, model, 'Welcome',
title='CSC Electronic Office',
escape_on_q=True,
)
self._model = model
self._members_menu_items = [
members_menu_items = [
('Add member', 'AddUser'),
('Add club rep', 'AddUser'),
('Renew member', 'RenewUser'),
('Renew club rep', 'RenewUser'),
('Get user info', 'GetUserInfo'),
('Get user info', 'GetUser'),
('Reset password', 'ResetPassword'),
('Modify user', 'ModifyUser'),
('Change login shell', 'ChangeLoginShell'),
('Set forwarding addresses', 'SetForwardingAddresses'),
]
self._members_menu = ListBox(
Widget.FILL_FRAME,
[
(desc, i) for i, (desc, view) in
enumerate(self._members_menu_items)
],
name='members',
label='Members',
on_select=self._members_menu_select,
)
layout = Layout([100], fill_frame=True)
members_menu = self._create_menu(
members_menu_items, 'members', self._members_menu_select)
groups_menu_items = [
('Add group', 'AddGroup'),
('Get group members', 'GetGroup'),
('Add member to group', 'AddMemberToGroup'),
('Remove member from group', 'RemoveMemberFromGroup'),
]
groups_menu = self._create_menu(groups_menu_items, 'groups')
db_menu_items = [
('Create MySQL database', 'CreateMySQL'),
('Reset MySQL password', 'ResetMySQLPassword'),
('Create PostgreSQL database', 'CreatePostgreSQL'),
('Reset PostgreSQL password', 'ResetPostgreSQLPassword'),
]
db_menu = self._create_menu(
db_menu_items, 'databases', self._db_menu_select)
positions_menu_items = [
('Get positions', 'GetPositions'),
('Set positions', 'SetPositions'),
]
positions_menu = self._create_menu(positions_menu_items, 'positions')
self._menu_groups = {
'members': members_menu_items,
'groups': groups_menu_items,
'databases': db_menu_items,
'positions': positions_menu_items,
}
layout = Layout([1, 4, 1], fill_frame=True)
self.add_layout(layout)
layout.add_widget(self._members_menu)
layout.add_widget(members_menu, 1)
layout.add_widget(groups_menu, 1)
layout.add_widget(db_menu, 1)
layout.add_widget(positions_menu, 1)
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Label('Press <TAB> to switch widgets'))
layout.add_widget(Divider())
layout = Layout([1, 1, 1])
@ -42,12 +68,40 @@ class WelcomeView(Frame):
layout.add_widget(Button("Quit", self._quit), 2)
self.fix()
def _members_menu_select(self):
def _create_menu(self, menu_items, name, on_select=None):
if on_select is None:
on_select = functools.partial(self._generic_menu_select, name)
return ListBox(
len(menu_items),
[
(desc, i) for i, (desc, view) in
enumerate(menu_items)
],
name=name,
label=name.capitalize(),
on_select=on_select,
)
def _get_menu_item_desc_view(self, menu_name: str):
self.save()
item_id = self.data['members']
desc, view = self._members_menu_items[item_id]
item_id = self.data[menu_name]
menu_items = self._menu_groups[menu_name]
return menu_items[item_id]
def _members_menu_select(self):
desc, view = self._get_menu_item_desc_view('members')
if desc.endswith('club rep'):
self._model.for_member = False
self._model.is_club_rep = True
self._welcomeview_go_to_next_scene(desc, view)
def _db_menu_select(self):
pass
def _generic_menu_select(self, menu_name):
desc, view = self._get_menu_item_desc_view('groups')
self._welcomeview_go_to_next_scene(desc, view)
def _welcomeview_go_to_next_scene(self, desc, view):
self._model.title = desc
self._model.scene_stack.append('Welcome')
raise NextScene(view)

View File

@ -0,0 +1,42 @@
from asciimatics.widgets import Layout, Text
from ...utils import defer, http_post
from ..CeoFrame import CeoFrame
from ceod.transactions.groups import AddGroupTransaction
class AddGroupView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'AddGroup',
save_data=True,
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._cn = Text('Name:', 'cn')
layout.add_widget(self._cn)
self._description = Text('Description:', 'description')
layout.add_widget(self._description)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _next(self):
cn = self._cn.value
description = self._description.value
body = {
'cn': cn,
'description': description,
}
self._model.confirm_lines = [
'The following group will be created:',
'',
('cn', cn),
('description', description),
'',
'Are you sure you want to continue?',
]
self._model.deferred_req = defer(http_post, '/api/groups', json=body)
self._model.operations = AddGroupTransaction.operations

View File

@ -0,0 +1,44 @@
from asciimatics.widgets import Layout, Text, CheckBox, Label
from ...utils import defer, http_post
from ..CeoFrame import CeoFrame
from ceod.transactions.groups import AddMemberToGroupTransaction
class AddMemberToGroupView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'AddMemberToGroup',
save_data=True,
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._cn = Text('Group name:', 'cn')
layout.add_widget(self._cn)
self._username = Text('Username:', 'uid')
layout.add_widget(self._username)
layout.add_widget(Label(''))
self._checkbox = CheckBox(
'subscribe to auxiliary mailing lists', name='subscribe')
self._checkbox.value = True
layout.add_widget(self._checkbox)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _next(self):
cn = self._cn.value
uid = self._username.value
self._model.confirm_lines = [
f'Are you sure you want to add {uid} to {cn}?',
]
operations = AddMemberToGroupTransaction.operations
url = f'/api/groups/{cn}/members/{uid}'
# TODO: deduplicate this logic from the CLI
if not self._checkbox.value:
url += '?subscribe_to_lists=false'
operations.remove('subscribe_user_to_auxiliary_mailing_lists')
self._model.deferred_req = defer(http_post, url)
self._model.operations = operations

View File

@ -0,0 +1,18 @@
import requests
from ..ResultView import ResultView
class GetGroupResultView(ResultView):
def show_result(self, resp: requests.Response):
d = resp.json()
if 'description' in d:
desc = d['description'] + ' (' + d['cn'] + ')'
else:
desc = d['cn']
self._add_text('Members of ' + desc, center=True)
self._add_text()
for member in d['members']:
self._add_text(
member['cn'] + ' (' + member['uid'] + ')',
center=True)

View File

@ -0,0 +1,31 @@
from asciimatics.widgets import Layout, Text
from ...utils import http_get
from ..CeoFrame import CeoFrame
class GetGroupView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'GetGroup',
save_data=True,
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._cn = Text("Group name", "cn")
layout.add_widget(self._cn)
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='GetGroupResult', on_next=self._next)
self.fix()
def _next(self):
cn = self._cn.value
self._model.viewdata['GetGroup']['cn'] = cn
self.flash_message('Looking up group...', force_update=True)
try:
self._model.resp = http_get(f'/api/groups/{cn}')
finally:
self.clear_flash_message()

View File

@ -0,0 +1,44 @@
from asciimatics.widgets import Layout, Text, CheckBox, Label
from ...utils import defer, http_delete
from ..CeoFrame import CeoFrame
from ceod.transactions.groups import RemoveMemberFromGroupTransaction
class RemoveMemberFromGroupView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'RemoveMemberFromGroup',
save_data=True,
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._cn = Text('Group name:', 'cn')
layout.add_widget(self._cn)
self._username = Text('Username:', 'uid')
layout.add_widget(self._username)
layout.add_widget(Label(''))
self._checkbox = CheckBox(
'unsubscribe from auxiliary mailing lists', name='unsubscribe')
self._checkbox.value = True
layout.add_widget(self._checkbox)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _next(self):
cn = self._cn.value
uid = self._username.value
self._model.confirm_lines = [
f'Are you sure you want to remove {uid} from {cn}?',
]
operations = RemoveMemberFromGroupTransaction.operations
url = f'/api/groups/{cn}/members/{uid}'
# TODO: deduplicate this logic from the CLI
if not self._checkbox.value:
url += '?unsubscribe_from_lists=false'
operations.remove('unsubscribe_user_from_auxiliary_mailing_lists')
self._model.deferred_req = defer(http_delete, url)
self._model.operations = operations

View File

View File

@ -1,21 +1,21 @@
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Frame, Layout, Text, Button, Divider
from threading import Thread
from asciimatics.widgets import Layout, Text
from ...term_utils import get_terms_for_new_user
from ...utils import http_get, http_post, defer, user_dict_kv, \
get_terms_for_new_user, get_adduser_operations
get_adduser_operations
from ..CeoFrame import CeoFrame
class AddUserView(Frame):
class AddUserView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen,
height,
width,
can_scroll=False,
on_load=self._on_load,
screen, height, width, model, 'AddUser',
save_data=True,
)
self._model = model
self._username_changed = False
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text(
@ -33,22 +33,14 @@ class AddUserView(Frame):
self._num_terms = Text(
"Number of terms:", "num_terms",
validator=lambda s: s.isdigit() and s[0] != '0')
self._num_terms.value = '1'
layout.add_widget(self._num_terms)
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Divider())
layout = Layout([1, 1])
self.add_layout(layout)
layout.add_widget(Button('Back', self._back), 0)
layout.add_widget(Button("Next", self._next), 1)
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _on_load(self):
self.title = self._model.title
def _on_username_change(self):
self._username_changed = True
@ -59,23 +51,24 @@ class AddUserView(Frame):
username = self._username.value
if username == '':
return
self._get_uwldap_info(username)
Thread(target=self._get_uwldap_info, args=[username]).start()
def _get_uwldap_info(self, username):
self.flash_message('Looking up user...')
try:
resp = http_get('/api/uwldap/' + username)
if resp.status_code != 200:
return
data = resp.json()
self._status_label.text = ''
self._full_name.value = data['cn']
self._program.value = data.get('program', '')
if data.get('mail_local_addresses'):
self._forwarding_address.value = data['mail_local_addresses'][0]
def _back(self):
raise NextScene(self._model.scene_stack.pop())
finally:
self.clear_flash_message()
def _next(self):
self._model.prev_scene = 'AddUser'
body = {
'uid': self._username.value,
'cn': self._full_name.value,
@ -85,10 +78,10 @@ class AddUserView(Frame):
if self._forwarding_address.value:
body['forwarding_addresses'] = [self._forwarding_address.value]
new_terms = get_terms_for_new_user(int(self._num_terms.value))
if self._model.for_member:
body['terms'] = new_terms
else:
if self._model.is_club_rep:
body['non_member_terms'] = new_terms
else:
body['terms'] = new_terms
pairs = user_dict_kv(body)
self._model.confirm_lines = [
'The following user will be created:',
@ -100,6 +93,3 @@ class AddUserView(Frame):
self._model.deferred_req = defer(http_post, '/api/members', json=body)
self._model.operations = get_adduser_operations(body)
self._model.scene_stack.append('AddUser')
raise NextScene('Confirm')

View File

@ -0,0 +1,71 @@
from threading import Thread
from asciimatics.widgets import Layout, Text
from ...utils import defer, http_patch, http_get
from ..CeoFrame import CeoFrame
class ChangeLoginShellView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'ChangeLoginShell',
save_data=True,
)
self._username_changed = False
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text(
"Username:", "uid",
on_change=self._on_username_change,
on_blur=self._on_username_blur,
)
layout.add_widget(self._username)
self._login_shell = Text('Login shell:', 'login_shell')
layout.add_widget(self._login_shell)
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
# TODO: deduplicate this from AddUserView
def _on_username_change(self):
self._username_changed = True
def _on_username_blur(self):
if not self._username_changed:
return
self._username_changed = False
username = self._username.value
if username == '':
return
Thread(target=self._get_user_info, args=[username]).start()
def _get_user_info(self, username):
self.flash_message('Looking up user...')
try:
resp = http_get('/api/members/' + username)
if resp.status_code != 200:
return
data = resp.json()
self._login_shell.value = data['login_shell']
finally:
self.clear_flash_message()
def _next(self):
uid = self._username.value
login_shell = self._login_shell.value
body = {'login_shell': login_shell}
self._model.deferred_req = defer(
http_patch, f'/api/members/{uid}', json=body)
self._model.confirm_lines = [
f"{uid}'s login shell will be changed to:",
'',
login_shell,
'',
'Are you sure you want to continue?',
]
self._model.operations = ['replace_login_shell']

View File

@ -0,0 +1,11 @@
import requests
from ...utils import user_dict_kv
from ..ResultView import ResultView
class GetUserResultView(ResultView):
def show_result(self, resp: requests.Response):
pairs = user_dict_kv(resp.json())
for key, val in pairs:
self._add_pair(key, val)

View File

@ -0,0 +1,30 @@
from asciimatics.widgets import Layout, Text
from ...utils import http_get
from ..CeoFrame import CeoFrame
class GetUserView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'GetUser',
save_data=True,
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text("Username:", "uid")
layout.add_widget(self._username)
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='GetUserResult', on_next=self._next)
self.fix()
def _next(self):
uid = self._username.value
self.flash_message('Looking up user...', force_update=True)
try:
self._model.resp = http_get(f'/api/members/{uid}')
finally:
self.clear_flash_message()

View File

@ -0,0 +1,60 @@
from asciimatics.widgets import Layout, Text
from ...term_utils import get_terms_for_renewal
from ...utils import http_post, defer
from ..CeoFrame import CeoFrame
class RenewUserView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'RenewUser',
save_data=True,
)
self._model = model
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text("Username:", "uid")
layout.add_widget(self._username)
self._num_terms = Text(
"Number of terms:", "num_terms",
validator=lambda s: s.isdigit() and s[0] != '0')
layout.add_widget(self._num_terms)
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _next(self):
uid = self._username.value
self.flash_message('Looking up user...', force_update=True)
try:
new_terms = get_terms_for_renewal(
uid,
int(self._num_terms.value),
self._model.is_club_rep,
self._model,
)
finally:
self.clear_flash_message()
body = {'uid': uid}
if self._model.is_club_rep:
body['non_member_terms'] = new_terms
terms_str = 'non-member terms'
else:
body['terms'] = new_terms
terms_str = 'member terms'
self._model.confirm_lines = [
'The following ' + terms_str + ' will be added:',
'',
','.join(new_terms),
'',
'Are you sure you want to continue?',
]
self._model.deferred_req = defer(
http_post, f'/api/members/{uid}/renew', json=body)

View File

@ -0,0 +1,12 @@
from ..ResultView import ResultView
import requests
class ResetPasswordResultView(ResultView):
def show_result(self, resp: requests.Response):
result = resp.json()
uid = self._model.viewdata['ResetPassword']['uid']
self._add_text(f'The new password for {uid} is:', center=True)
self._add_text()
self._add_text(result['password'], center=True)

View File

@ -0,0 +1,31 @@
from asciimatics.widgets import Layout, Text, Label
from ...utils import defer, http_post
from ..CeoFrame import CeoFrame
class ResetPasswordView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'ResetPassword',
save_data=True,
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
layout.add_widget(Label('Enter the username of the user whose password will be reset:'))
self._username = Text(None, "uid")
layout.add_widget(self._username)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _next(self):
uid = self._username.value
self._model.viewdata['ResetPassword']['uid'] = uid
self._model.confirm_lines = [
f"Are you sure you want to reset {uid}'s password?",
]
self._model.deferred_req = defer(http_post, f'/api/members/{uid}/pwreset')
self._model.result_view_name = 'ResetPasswordResult'

View File

@ -0,0 +1,76 @@
from threading import Thread
from asciimatics.widgets import Layout, Label, Text, TextBox, Widget
from ...utils import defer, http_patch, http_get
from ..CeoFrame import CeoFrame
class SetForwardingAddressesView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'SetForwardingAddresses',
save_data=True,
)
self._username_changed = False
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text(
"Username:", "uid",
on_change=self._on_username_change,
on_blur=self._on_username_blur,
)
layout.add_widget(self._username)
self._forwarding_addresses = TextBox(
Widget.FILL_FRAME, 'Forwarding addresses:', 'forwarding_addresses',
line_wrap=True)
layout.add_widget(self._forwarding_addresses)
layout.add_widget(Label('Press <TAB> to switch widgets'))
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
# TODO: deduplicate this from AddUserView
def _on_username_change(self):
self._username_changed = True
def _on_username_blur(self):
if not self._username_changed:
return
self._username_changed = False
username = self._username.value
if username == '':
return
Thread(target=self._get_user_info, args=[username]).start()
def _get_user_info(self, username):
self.flash_message('Looking up user...')
try:
resp = http_get('/api/members/' + username)
if resp.status_code != 200:
return
data = resp.json()
if 'forwarding_addresses' not in data:
return
self._forwarding_addresses.value = data['forwarding_addresses']
finally:
self.clear_flash_message()
def _next(self):
uid = self._username.value
forwarding_addresses = self._forwarding_addresses.value
body = {'forwarding_addresses': forwarding_addresses}
self._model.deferred_req = defer(
http_patch, f'/api/members/{uid}', json=body)
self._model.confirm_lines = [
f"{uid}'s forwarding addresses will be set to:",
'',
*forwarding_addresses,
'',
'Are you sure you want to continue?',
]
self._model.operations = ['replace_forwarding_addresses']

View File

@ -1,38 +1,70 @@
import sys
from asciimatics.event import KeyboardEvent
from asciimatics.exceptions import ResizeScreenError, StopApplication
from asciimatics.exceptions import ResizeScreenError
from asciimatics.scene import Scene
from asciimatics.screen import Screen
from .ConfirmView import ConfirmView
from .ErrorView import ErrorView
from .Model import Model
from .ResultView import ResultView
from .TransactionView import TransactionView
from .WelcomeView import WelcomeView
from .groups.AddGroupView import AddGroupView
from .groups.AddMemberToGroupView import AddMemberToGroupView
from .groups.GetGroupView import GetGroupView
from .groups.GetGroupResultView import GetGroupResultView
from .groups.RemoveMemberFromGroupView import RemoveMemberFromGroupView
from .members.AddUserView import AddUserView
from .members.ChangeLoginShellView import ChangeLoginShellView
from .members.GetUserView import GetUserView
from .members.GetUserResultView import GetUserResultView
from .members.RenewUserView import RenewUserView
from .members.ResetPasswordView import ResetPasswordView
from .members.ResetPasswordResultView import ResetPasswordResultView
from .members.SetForwardingAddressesView import SetForwardingAddressesView
def unhandled(event):
if isinstance(event, KeyboardEvent):
c = event.key_code
# Stop on 'q' or 'Esc'
if c in (113, 27):
raise StopApplication("User terminated app")
# tuples of (name, view)
views = []
def screen_wrapper(screen, scene, model):
model.screen = screen
def screen_wrapper(screen, last_scene, model):
global views
# unload the old views
for name, view in views:
if hasattr(view, '_on_ceoframe_unload'):
view._on_ceoframe_unload()
width = min(screen.width, 90)
height = min(screen.height, 24)
scenes = [
Scene([WelcomeView(screen, width, height, model)], -1, name='Welcome'),
Scene([AddUserView(screen, width, height, model)], -1, name='AddUser'),
Scene([ConfirmView(screen, width, height, model)], -1, name='Confirm'),
Scene([TransactionView(screen, width, height, model)], -1, name='Transaction'),
views = [
('Welcome', WelcomeView(screen, width, height, model)),
('Confirm', ConfirmView(screen, width, height, model)),
('Transaction', TransactionView(screen, width, height, model)),
('Result', ResultView(screen, width, height, model)),
('Error', ErrorView(screen, width, height, model)),
('AddUser', AddUserView(screen, width, height, model)),
('RenewUser', RenewUserView(screen, width, height, model)),
('GetUser', GetUserView(screen, width, height, model)),
('GetUserResult', GetUserResultView(screen, width, height, model)),
('ResetPassword', ResetPasswordView(screen, width, height, model)),
('ResetPasswordResult', ResetPasswordResultView(screen, width, height, model)),
('ChangeLoginShell', ChangeLoginShellView(screen, width, height, model)),
('SetForwardingAddresses', SetForwardingAddressesView(screen, width, height, model)),
('AddGroup', AddGroupView(screen, width, height, model)),
('GetGroup', GetGroupView(screen, width, height, model)),
('GetGroupResult', GetGroupResultView(screen, width, height, model)),
('AddMemberToGroup', AddMemberToGroupView(screen, width, height, model)),
('RemoveMemberFromGroup', RemoveMemberFromGroupView(screen, width, height, model)),
]
scenes = [
Scene([view], -1, name=name) for name, view in views
]
model.screen = screen
model.views = [view for name, view in views]
screen.play(
scenes, stop_on_resize=True, start_scene=scene, allow_int=True,
unhandled_input=unhandled)
scenes, stop_on_resize=True, start_scene=last_scene, allow_int=True,
)
def main():

10
ceo/tui/utils.py Normal file
View File

@ -0,0 +1,10 @@
from asciimatics.exceptions import NextScene
import requests
def handle_sync_response(resp: requests.Response, model):
if resp.status_code != 200:
model.error_message = resp.text.rstrip()
raise NextScene('Error')
return resp.json()

View File

@ -1,6 +1,5 @@
import functools
import json
import sys
from typing import List, Dict, Tuple, Callable
import requests
@ -8,7 +7,6 @@ from zope import component
from .StreamResponseHandler import StreamResponseHandler
from ceo_common.interfaces import IHTTPClient, IConfig
from ceo_common.model import Term
from ceod.transactions.members import AddMemberTransaction
@ -71,6 +69,8 @@ def space_colon_kv(pairs: List[Tuple[str, str]]) -> List[str]:
key1000: val3
val4
"""
if not pairs:
return []
lines = []
maxlen = max(len(key) for key, val in pairs)
for key, val in pairs:
@ -85,12 +85,6 @@ def space_colon_kv(pairs: List[Tuple[str, str]]) -> List[str]:
return lines
def get_terms_for_new_user(num_terms: int) -> List[str]:
current_term = Term.current()
terms = [current_term + i for i in range(num_terms)]
return list(map(str, terms))
def user_dict_kv(d: Dict) -> List[Tuple[str]]:
"""Pretty-format a serialized User as (key, value) pairs."""
pairs = [
@ -149,15 +143,16 @@ def generic_handle_stream_response(
"""
if resp.status_code != 200:
handler.handle_non_200(resp)
return
handler.begin()
idx = 0
data = []
for line in resp.iter_lines(decode_unicode=True, chunk_size=8):
for line in resp.iter_lines(decode_unicode=True, chunk_size=1):
d = json.loads(line)
data.append(d)
if d['status'] == 'aborted':
handler.handle_aborted(d['error'])
sys.exit(1)
return
elif d['status'] == 'completed':
while idx < len(operations):
handler.handle_skipped_operation()

View File

@ -29,6 +29,12 @@ def update_positions():
required = cfg.get('positions_required')
available = cfg.get('positions_available')
# remove falsy values
body = {
positions: username for positions, username in body.items()
if username
}
for position in body.keys():
if position not in available:
return {

View File

@ -11,7 +11,7 @@ You can hear about upcoming events in a number of ways:
* Check our website from time to time: http://csclub.uwaterloo.ca/
* Subscribe to our events calendar feed: http://csclub.uwaterloo.ca/events.ics
* Like the CSC on Facebook: https://www.facebook.com/uw.computerscienceclub
* Join the CSC Discord server: https://discord.gg/uwcsclub
* Join the CSC Discord server: https://discord.gg/pHfYBCg
* Read your email: announcements are sent via the csc-general mailing list
* Keep an eye out in the MC: posters for upcoming events appear in stairwells and hallways

View File

@ -69,9 +69,9 @@ class AddMemberToGroupTransaction(AbstractTransaction):
yield 'subscribe_user_to_auxiliary_mailing_lists'
except KeyError:
pass
except Exception:
except Exception as err:
logger.error(traceback.format_exc())
yield 'failed_to_subscribe_user_to_auxiliary_mailing_lists'
yield 'failed_to_subscribe_user_to_auxiliary_mailing_lists: ' + str(err)
result = {
'added_to_groups': [self.group_name] + [

View File

@ -69,9 +69,9 @@ class RemoveMemberFromGroupTransaction(AbstractTransaction):
yield 'unsubscribe_user_from_auxiliary_mailing_lists'
except KeyError:
pass
except Exception:
except Exception as err:
logger.error(traceback.format_exc())
yield 'failed_to_unsubscribe_user_from_auxiliary_mailing_lists'
yield 'failed_to_unsubscribe_user_from_auxiliary_mailing_lists: ' + str(err)
result = {
'removed_from_groups': [self.group_name] + [

View File

@ -0,0 +1,55 @@
from click.testing import CliRunner
from ceo.cli import cli
def test_positions(cli_setup):
runner = CliRunner()
# Setup test data
for i in range(5):
runner.invoke(cli, ['members', 'add', f'test_{i}', '--cn', f'Test {i}', '--program', 'Math', '--terms', '1'], input='y\n')
runner.invoke(cli, ['groups', 'add', 'exec', '--description', 'Test Group'], input='y\n')
result = runner.invoke(cli, [
'positions', 'set',
'--president', 'test_0',
'--vice-president', 'test_1',
'--sysadmin', 'test_2',
'--secretary', 'test_3',
'--webmaster', 'test_4',
], input='y\n')
assert result.exit_code == 0
assert result.output == '''
The positions will be updated:
president: test_0
vice-president: test_1
sysadmin: test_2
secretary: test_3
webmaster: test_4
treasurer:
cro:
librarian:
imapd:
offsck:
Do you want to continue? [y/N]: y
Update positions in LDAP... Done
Update executive group in LDAP... Done
Subscribe to mailing lists... Done
Transaction successfully completed.
'''[1:] # noqa: W291
result = runner.invoke(cli, ['positions', 'get'])
assert result.exit_code == 0
assert result.output == '''
president: test_0
secretary: test_3
sysadmin: test_2
vice-president: test_1
webmaster: test_4
'''[1:]
# Cleanup test data
for i in range(5):
runner.invoke(cli, ['members', 'delete', f'test_{i}'], input='y\n')
runner.invoke(cli, ['groups', 'delete', 'exec'], input='y\n')

View File

@ -7,3 +7,8 @@ uw_domain = uwaterloo.internal
admin_host = phosphoric-acid
use_https = false
port = 9987
[positions]
required = president,vice-president,sysadmin
available = president,vice-president,treasurer,secretary,
sysadmin,cro,librarian,imapd,webmaster,offsck

View File

@ -51,7 +51,7 @@ def cfg(_drone_hostname_mock):
with importlib.resources.path('tests', 'ceod_test_local.ini') as p:
config_file = p.__fspath__()
_cfg = Config(config_file)
component.provideUtility(_cfg, IConfig)
component.getGlobalSiteManager().registerUtility(_cfg, IConfig)
return _cfg
@ -75,7 +75,7 @@ def krb_srv(cfg):
else:
principal = 'ceod/' + socket.getfqdn()
krb = KerberosService(principal)
component.provideUtility(krb, IKerberosService)
component.getGlobalSiteManager().registerUtility(krb, IKerberosService)
delete_test_princs(krb)
yield krb
@ -160,7 +160,7 @@ def ldap_srv_session(cfg, krb_srv, ldap_conn):
conn.add(base_dn, 'organizationalUnit')
_ldap_srv = LDAPService()
component.provideUtility(_ldap_srv, ILDAPService)
component.getGlobalSiteManager().registerUtility(_ldap_srv, ILDAPService)
yield _ldap_srv
@ -180,7 +180,7 @@ def ldap_srv(ldap_srv_session, g_admin_ctx):
@pytest.fixture(scope='session')
def file_srv(cfg):
_file_srv = FileService()
component.provideUtility(_file_srv, IFileService)
component.getGlobalSiteManager().registerUtility(_file_srv, IFileService)
members_home = cfg.get('members_home')
clubs_home = cfg.get('clubs_home')
@ -194,7 +194,7 @@ def file_srv(cfg):
@pytest.fixture(scope='session')
def http_client(cfg):
_client = HTTPClient()
component.provideUtility(_client, IHTTPClient)
component.getGlobalSiteManager().registerUtility(_client, IHTTPClient)
return _client
@ -210,7 +210,7 @@ def mock_mailman_server():
def mailman_srv(mock_mailman_server, cfg, http_client):
# TODO: test the RemoteMailmanService as well
mailman = MailmanService()
component.provideUtility(mailman, IMailmanService)
component.getGlobalSiteManager().registerUtility(mailman, IMailmanService)
return mailman
@ -223,7 +223,7 @@ def uwldap_srv(cfg, ldap_conn):
conn.add(base_dn, 'organizationalUnit')
_uwldap_srv = UWLDAPService()
component.provideUtility(_uwldap_srv, IUWLDAPService)
component.getGlobalSiteManager().registerUtility(_uwldap_srv, IUWLDAPService)
yield _uwldap_srv
delete_subtree(conn, base_dn)
@ -240,21 +240,21 @@ def mock_mail_server():
@pytest.fixture(scope='session')
def mail_srv(cfg, mock_mail_server):
_mail_srv = MailService()
component.provideUtility(_mail_srv, IMailService)
component.getGlobalSiteManager().registerUtility(_mail_srv, IMailService)
return _mail_srv
@pytest.fixture(scope='session')
def mysql_srv(cfg):
mysql_srv = MySQLService()
component.provideUtility(mysql_srv, IDatabaseService, 'mysql')
component.getGlobalSiteManager().registerUtility(mysql_srv, IDatabaseService, 'mysql')
return mysql_srv
@pytest.fixture(scope='session')
def postgresql_srv(cfg):
psql_srv = PostgreSQLService()
component.provideUtility(psql_srv, IDatabaseService, 'postgresql')
component.getGlobalSiteManager().registerUtility(psql_srv, IDatabaseService, 'postgresql')
return psql_srv