Merge branch 'v1' into positions-cli

This commit is contained in:
Max Erenberg 2021-09-08 04:38:34 +00:00
commit 36fd303433
47 changed files with 1856 additions and 292 deletions

View File

@ -0,0 +1,43 @@
from abc import ABC, abstractmethod
from typing import Union
import requests
class StreamResponseHandler(ABC):
"""
An abstract class to handle stream responses from the server.
The CLI and TUI should implement a child class.
"""
@abstractmethod
def handle_non_200(self, resp: requests.Response):
"""Handle a non-200 response."""
@abstractmethod
def begin(self):
"""Begin the transaction."""
@abstractmethod
def handle_aborted(self, err_msg: str):
"""Handle an aborted transaction."""
@abstractmethod
def handle_completed(self):
"""Handle a completed transaction."""
@abstractmethod
def handle_successful_operation(self):
"""Handle a successful operation."""
@abstractmethod
def handle_failed_operation(self, err_msg: Union[str, None]):
"""Handle a failed operation."""
@abstractmethod
def handle_skipped_operation(self):
"""Handle a skipped operation."""
@abstractmethod
def handle_unrecognized_operation(self, operation: str):
"""Handle an unrecognized operation."""

View File

@ -1,4 +1,46 @@
import importlib.resources
import os
import socket
import sys
from zope import component
from .cli import cli
from .krb_check import krb_check
from .tui.start import main as tui_main
from ceo_common.interfaces import IConfig, IHTTPClient
from ceo_common.model import Config, HTTPClient
def register_services():
# Using base component directly so events get triggered
baseComponent = component.getGlobalSiteManager()
# Config
# This is a hack to determine if we're in the dev env or not
if socket.getfqdn().endswith('.csclub.internal'):
with importlib.resources.path('tests', 'ceo_dev.ini') as p:
config_file = p.__fspath__()
else:
config_file = os.environ.get('CEO_CONFIG', '/etc/csc/ceo.ini')
cfg = Config(config_file)
component.provideUtility(cfg, IConfig)
baseComponent.registerUtility(cfg, IConfig)
# HTTPService
http_client = HTTPClient()
component.provideUtility(http_client, IHTTPClient)
baseComponent.registerUtility(http_client, IHTTPClient)
def main():
krb_check()
register_services()
if len(sys.argv) > 1:
cli(obj={})
else:
tui_main()
if __name__ == '__main__':
cli(obj={})
main()

View File

@ -0,0 +1,70 @@
import sys
from typing import List, Union
import click
import requests
from ..StreamResponseHandler import StreamResponseHandler
from ..operation_strings import descriptions as op_desc
class Abort(click.ClickException):
"""Abort silently."""
def __init__(self, exit_code=1):
super().__init__('')
self.exit_code = exit_code
def show(self):
pass
class CLIStreamResponseHandler(StreamResponseHandler):
def __init__(self, operations: List[str]):
super().__init__()
self.operations = operations
self.idx = 0
def handle_non_200(self, resp: requests.Response):
click.echo('An error occurred:')
click.echo(resp.text.rstrip())
raise Abort()
def begin(self):
click.echo(op_desc[self.operations[0]] + '... ', nl=False)
def handle_aborted(self, err_msg: str):
click.echo(click.style('ABORTED', fg='red'))
click.echo('The transaction was rolled back.')
click.echo('The error was: ' + err_msg)
click.echo('Please check the ceod logs.')
sys.exit(1)
def handle_completed(self):
click.echo('Transaction successfully completed.')
def _go_to_next_op(self):
"""
Increment the operation index and print the next operation, if
there is one.
"""
self.idx += 1
if self.idx < len(self.operations):
click.echo(op_desc[self.operations[self.idx]] + '... ', nl=False)
def handle_successful_operation(self):
click.echo(click.style('Done', fg='green'))
self._go_to_next_op()
def handle_failed_operation(self, err_msg: Union[str, None]):
click.echo(click.style('Failed', fg='red'))
if err_msg is not None:
click.echo(' Error message: ' + err_msg)
self._go_to_next_op()
def handle_skipped_operation(self):
click.echo('Skipped')
self._go_to_next_op()
def handle_unrecognized_operation(self, operation: str):
click.echo('Unrecognized operation: ' + operation)

View File

@ -1,53 +1,17 @@
import importlib.resources
import os
import socket
import click
from zope import component
from ..krb_check import krb_check
from .members import members
from .groups import groups
from .positions import positions
from .updateprograms import updateprograms
from ceo_common.interfaces import IConfig, IHTTPClient
from ceo_common.model import Config, HTTPClient
@click.group()
@click.pass_context
def cli(ctx):
# ensure ctx exists and is a dict
ctx.ensure_object(dict)
princ = krb_check()
user = princ[:princ.index('@')]
ctx.obj['user'] = user
if os.environ.get('PYTEST') != '1':
register_services()
def cli():
pass
cli.add_command(members)
cli.add_command(groups)
cli.add_command(positions)
cli.add_command(updateprograms)
def register_services():
# Using base component directly so events get triggered
baseComponent = component.getGlobalSiteManager()
# Config
# This is a hack to determine if we're in the dev env or not
if socket.getfqdn().endswith('.csclub.internal'):
with importlib.resources.path('tests', 'ceo_dev.ini') as p:
config_file = p.__fspath__()
else:
config_file = os.environ.get('CEO_CONFIG', '/etc/csc/ceo.ini')
cfg = Config(config_file)
baseComponent.registerUtility(cfg, IConfig)
# HTTPService
http_client = HTTPClient()
baseComponent.registerUtility(http_client, IHTTPClient)

View File

@ -4,15 +4,13 @@ from typing import Dict
import click
from zope import component
from ..utils import http_post, http_get, http_patch, http_delete, get_failed_operations
from .utils import handle_stream_response, handle_sync_response, print_colon_kv, \
from ..term_utils import get_terms_for_new_user, get_terms_for_renewal
from ..utils import http_post, http_get, http_patch, http_delete, \
get_failed_operations, user_dict_lines, get_adduser_operations
from .utils import handle_stream_response, handle_sync_response, print_lines, \
check_if_in_development
from ceo_common.interfaces import IConfig
from ceo_common.model import Term
from ceod.transactions.members import (
AddMemberTransaction,
DeleteMemberTransaction,
)
from ceod.transactions.members import DeleteMemberTransaction
@click.group(short_help='Perform operations on CSC members and club reps')
@ -25,7 +23,7 @@ def members():
@click.option('--cn', help='Full name', prompt='Full name')
@click.option('--program', required=False, help='Academic program')
@click.option('--terms', 'num_terms', type=click.IntRange(1, 100),
help='Number of terms to add', prompt='Number of terms')
help='Number of terms to add', default=1)
@click.option('--clubrep', is_flag=True, default=False,
help='Add non-member terms instead of member terms')
@click.option('--forwarding-address', required=False,
@ -36,30 +34,12 @@ def add(username, cn, program, num_terms, clubrep, forwarding_address):
cfg = component.getUtility(IConfig)
uw_domain = cfg.get('uw_domain')
current_term = Term.current()
terms = [current_term + i for i in range(num_terms)]
terms = list(map(str, terms))
terms = get_terms_for_new_user(num_terms)
# TODO: get email address from UWLDAP
if forwarding_address is None:
forwarding_address = username + '@' + uw_domain
click.echo("The following user will be created:")
lines = [
('uid', username),
('cn', cn),
]
if program is not None:
lines.append(('program', program))
if clubrep:
lines.append(('non-member terms', ','.join(terms)))
else:
lines.append(('member terms', ','.join(terms)))
if forwarding_address != '':
lines.append(('forwarding address', forwarding_address))
print_colon_kv(lines)
click.confirm('Do you want to continue?', abort=True)
body = {
'uid': username,
'cn': cn,
@ -72,10 +52,14 @@ def add(username, cn, program, num_terms, clubrep, forwarding_address):
body['terms'] = terms
if forwarding_address != '':
body['forwarding_addresses'] = [forwarding_address]
operations = AddMemberTransaction.operations
if forwarding_address == '':
# don't bother displaying this because it won't be run
operations.remove('set_forwarding_addresses')
else:
body['forwarding_addresses'] = []
click.echo("The following user will be created:")
print_user_lines(body)
click.confirm('Do you want to continue?', abort=True)
operations = get_adduser_operations(body)
resp = http_post('/api/members', json=body)
data = handle_stream_response(resp, operations)
@ -89,30 +73,9 @@ def add(username, cn, program, num_terms, clubrep, forwarding_address):
'send the user their password.', fg='yellow'))
def print_user_lines(result: Dict):
"""Pretty-print a user JSON response."""
lines = [
('uid', result['uid']),
('cn', result['cn']),
('program', result.get('program', 'Unknown')),
('UID number', result['uid_number']),
('GID number', result['gid_number']),
('login shell', result['login_shell']),
('home directory', result['home_directory']),
('is a club', result['is_club']),
]
if 'forwarding_addresses' in result:
if len(result['forwarding_addresses']) != 0:
lines.append(('forwarding addresses', result['forwarding_addresses'][0]))
for address in result['forwarding_addresses'][1:]:
lines.append(('', address))
if 'terms' in result:
lines.append(('terms', ','.join(result['terms'])))
if 'non_member_terms' in result:
lines.append(('non-member terms', ','.join(result['non_member_terms'])))
if 'password' in result:
lines.append(('password', result['password']))
print_colon_kv(lines)
def print_user_lines(d: Dict):
"""Pretty-print a serialized User."""
print_lines(user_dict_lines(d))
@members.command(short_help='Get info about a user')
@ -169,22 +132,7 @@ def modify(username, login_shell, forwarding_addresses):
@click.option('--clubrep', is_flag=True, default=False,
help='Add non-member terms instead of member terms')
def renew(username, num_terms, clubrep):
resp = http_get('/api/members/' + username)
result = handle_sync_response(resp)
max_term = None
current_term = Term.current()
if clubrep and 'non_member_terms' in result:
max_term = max(Term(s) for s in result['non_member_terms'])
elif not clubrep and 'terms' in result:
max_term = max(Term(s) for s in result['terms'])
if max_term is not None and max_term >= current_term:
next_term = max_term + 1
else:
next_term = Term.current()
terms = [next_term + i for i in range(num_terms)]
terms = list(map(str, terms))
terms = get_terms_for_renewal(username, num_terms, clubrep)
if clubrep:
body = {'non_member_terms': terms}

View File

@ -1,12 +1,11 @@
import json
import socket
import sys
from typing import List, Tuple, Dict
import click
import requests
from ..operation_strings import descriptions as op_desc
from ..utils import space_colon_kv, generic_handle_stream_response
from .CLIStreamResponseHandler import CLIStreamResponseHandler
class Abort(click.ClickException):
@ -20,89 +19,23 @@ class Abort(click.ClickException):
pass
def print_lines(lines: List[str]):
"""Print multiple lines to stdout."""
for line in lines:
click.echo(line)
def print_colon_kv(pairs: List[Tuple[str, str]]):
"""
Pretty-print a list of key-value pairs such that the key and value
columns align.
Example:
key1: value1
key1000: value2
Pretty-print a list of key-value pairs.
"""
if len(pairs) == 0:
return
maxlen = max(len(key) for key, val in pairs)
for key, val in pairs:
if key != '':
click.echo(key + ': ', nl=False)
else:
# assume this is a continuation from the previous line
click.echo(' ', nl=False)
extra_space = ' ' * (maxlen - len(key))
click.echo(extra_space, nl=False)
click.echo(val)
for line in space_colon_kv(pairs):
click.echo(line)
def handle_stream_response(resp: requests.Response, operations: List[str]) -> List[Dict]:
"""
Print output to the console while operations are being streamed
from the server over HTTP.
Returns the parsed JSON data streamed from the server.
"""
if resp.status_code != 200:
click.echo('An error occurred:')
click.echo(resp.text.rstrip())
raise Abort()
click.echo(op_desc[operations[0]] + '... ', nl=False)
idx = 0
data = []
for line in resp.iter_lines(decode_unicode=True, chunk_size=8):
d = json.loads(line)
data.append(d)
if d['status'] == 'aborted':
click.echo(click.style('ABORTED', fg='red'))
click.echo('The transaction was rolled back.')
click.echo('The error was: ' + d['error'])
click.echo('Please check the ceod logs.')
sys.exit(1)
elif d['status'] == 'completed':
if idx < len(operations):
click.echo('Skipped')
click.echo('Transaction successfully completed.')
return data
operation = d['operation']
oper_failed = False
err_msg = None
prefix = 'failed_to_'
if operation.startswith(prefix):
operation = operation[len(prefix):]
oper_failed = True
# sometimes the operation looks like
# "failed_to_do_something: error message"
if ':' in operation:
operation, err_msg = operation.split(': ', 1)
while idx < len(operations) and operations[idx] != operation:
click.echo('Skipped')
idx += 1
if idx == len(operations):
break
click.echo(op_desc[operations[idx]] + '... ', nl=False)
if idx == len(operations):
click.echo('Unrecognized operation: ' + operation)
continue
if oper_failed:
click.echo(click.style('Failed', fg='red'))
if err_msg is not None:
click.echo(' Error message: ' + err_msg)
else:
click.echo(click.style('Done', fg='green'))
idx += 1
if idx < len(operations):
click.echo(op_desc[operations[idx]] + '... ', nl=False)
raise Exception('server response ended abruptly')
handler = CLIStreamResponseHandler(operations)
return generic_handle_stream_response(resp, operations, handler)
def handle_sync_response(resp: requests.Response):

View File

@ -3,17 +3,28 @@ import subprocess
import gssapi
_username = None
def get_username():
"""Get the user currently logged into CEO."""
return _username
def krb_check():
"""
Spawns a `kinit` process if no credentials are available or the
credentials have expired.
Returns the principal string 'user@REALM'.
Stores the username for later use by get_username().
"""
global _username
for _ in range(2):
try:
creds = gssapi.Credentials(usage='initiate')
result = creds.inquire()
return str(result.name)
princ = str(result.name)
_username = princ[:princ.index('@')]
return
except (gssapi.raw.misc.GSSError, gssapi.raw.exceptions.ExpiredCredentialsError):
kinit()

38
ceo/term_utils.py Normal file
View File

@ -0,0 +1,38 @@
from typing import List
from .utils import http_get
from ceo_common.model import Term
import ceo.cli.utils as cli_utils
import ceo.tui.utils as tui_utils
# Had to put these in a separate file to avoid a circular import.
def get_terms_for_new_user(num_terms: int) -> List[str]:
current_term = Term.current()
terms = [current_term + i for i in range(num_terms)]
return list(map(str, terms))
def get_terms_for_renewal(
username: str, num_terms: int, clubrep: bool, tui_model=None,
) -> List[str]:
resp = http_get('/api/members/' + username)
if tui_model is None:
result = cli_utils.handle_sync_response(resp)
else:
result = tui_utils.handle_sync_response(resp, tui_model)
max_term = None
current_term = Term.current()
if clubrep and 'non_member_terms' in result:
max_term = max(Term(s) for s in result['non_member_terms'])
elif not clubrep and 'terms' in result:
max_term = max(Term(s) for s in result['terms'])
if max_term is not None and max_term >= current_term:
next_term = max_term + 1
else:
next_term = Term.current()
terms = [next_term + i for i in range(num_terms)]
return list(map(str, terms))

168
ceo/tui/CeoFrame.py Normal file
View File

@ -0,0 +1,168 @@
from asciimatics.event import KeyboardEvent
from asciimatics.exceptions import NextScene, StopApplication
from asciimatics.screen import Screen
from asciimatics.widgets import Frame, Layout, Divider, Button, Label, \
PopUpDialog
class CeoFrame(Frame):
def __init__(
self,
screen,
height,
width,
model,
name, # key in model.viewdata
on_load=None,
title=None,
save_data=False, # whether to save widget state for resizing
has_dynamic_layouts=False, # whether layouts are created on load
escape_on_q=False, # whether to quit when 'q' is pressed
):
super().__init__(
screen,
height,
width,
name=name,
can_scroll=False,
title=title,
on_load=self._ceoframe_on_load,
)
self._save_data = save_data
self._extra_on_load = on_load
self._model = model
self._name = name
self._loaded = False
self._has_dynamic_layouts = has_dynamic_layouts
self._quit_keys = [Screen.KEY_ESCAPE]
if escape_on_q:
self._quit_keys.append(ord('q'))
# sanity check
if save_data:
assert name in model.viewdata
def _ceoframe_on_load(self):
# We usually don't want _on_load() to be called multiple times
# e.g. when switching back to a scene, or after calling reset()
if self._loaded:
return
self._loaded = True
if self._model.title is not None:
self.title = self._model.title
self._model.title = None
if self._save_data:
# restore the saved input fields' values
self.data = self._model.viewdata[self._name]
if self._extra_on_load is not None:
self._extra_on_load()
def _ceoframe_on_unload(self):
"""
This should be called just after the screen gets resized,
but before the new scenes are constructed.
The idea is to save the user's data in the input fields
so that we can restore them in the new scenes.
"""
if not self._save_data:
return
self.save()
self._model.viewdata[self._name] = self.data
def _ceoframe_on_reset(self):
"""
This needs to be called whenever we return to the home screen
after some kind of operation was completed.
Currently this is called from Model.reset().
"""
# We want a fresh slate once we return to the home screen, so we
# want on_load() to be called for the scenes.
self._loaded = False
if self._has_dynamic_layouts:
# We don't want layouts to accumulate.
self.clear_layouts()
def clear_layouts(self):
# OK so this a *really* bad thing to do, since we're reaching
# into the private variables of a third-party library.
# Unfortunately asciimatics doesn't allow us to clear the layouts
# of an existing frame, and we need this to be able to re-use
# frames which create layouts dynamically.
self._layouts.clear()
def add_buttons(
self, back_btn=False, back_btn_text='Back',
next_scene=None, next_btn_text='Next', on_next=None,
on_next_excl=None,
):
"""
Add a new layout at the bottom of the frame with buttons.
If back_btn is True, a Back button is added.
If next_scene is set to the name of the next scene, or on_next_excl
is set, a Next button will be added.
If on_next is set to a function, it will be called when the Next
button is pressed, and the screen will switch to the next scene.
If on_next_excl is set to a function, it will be called when the Next
button is pressed, and the scene will not be switched.
If both on_next and on_next_excl are set, on_next will be ignored.
"""
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Divider())
def _back():
raise NextScene(self._model.scene_stack.pop())
def _next():
if on_next_excl is not None:
on_next_excl()
return
if on_next is not None:
on_next()
self.go_to_next_scene(next_scene)
layout = Layout([1, 1])
self.add_layout(layout)
if back_btn:
layout.add_widget(Button(back_btn_text, _back), 0)
if next_scene is not None or on_next_excl is not None:
layout.add_widget(Button(next_btn_text, _next), 1)
def go_to_next_scene(self, next_scene: str):
self._model.scene_stack.append(self._name)
raise NextScene(next_scene)
def add_flash_message_layout(self):
layout = Layout([100])
self.add_layout(layout)
self._status_label = Label('')
layout.add_widget(self._status_label)
def flash_message(self, msg: str, force_update: bool = False):
self._status_label.text = msg
if force_update:
self._model.screen.force_update()
self._model.screen.draw_next_frame()
def clear_flash_message(self):
self.flash_message('')
def process_event(self, event):
if not isinstance(event, KeyboardEvent):
return super().process_event(event)
c = event.key_code
# Stop on 'q' or 'Esc'
if c in self._quit_keys:
self._scene.add_effect(PopUpDialog(
self.screen,
'Are you sure you want to quit?',
['Yes', 'No'],
has_shadow=True,
on_close=self._quit_on_yes,
))
return super().process_event(event)
@staticmethod
def _quit_on_yes(selected):
# Yes is the first button
if selected == 0:
raise StopApplication("User terminated app")

57
ceo/tui/ConfirmView.py Normal file
View File

@ -0,0 +1,57 @@
from asciimatics.widgets import Layout, Label
from .CeoFrame import CeoFrame
class ConfirmView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Confirm',
on_load=self._confirmview_on_load, title='Confirmation',
has_dynamic_layouts=True,
escape_on_q=True,
)
def _add_line(self, text: str = ''):
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Label(text, align='^'))
def _add_pair(self, key: str, val: str):
layout = Layout([10, 1, 10])
self.add_layout(layout)
layout.add_widget(Label(key + ':', align='>'), 0)
layout.add_widget(Label(val, align='<'), 2)
def _confirmview_on_load(self):
for _ in range(2):
self._add_line()
for line in self._model.confirm_lines:
if isinstance(line, str):
self._add_line(line)
else:
# assume tuple
key, val = line
self._add_pair(key, val)
# fill the rest of the space
self.add_layout(Layout([100], fill_frame=True))
kwargs = {
'back_btn': True, 'back_btn_text': 'No', 'next_btn_text': 'Yes',
}
if self._model.operations is not None:
kwargs['next_scene'] = 'Transaction'
else:
self.add_flash_message_layout()
kwargs['on_next_excl'] = self._next
self.add_buttons(**kwargs)
self.fix()
def _next(self):
self.flash_message('Sending request...', force_update=True)
try:
self._model.resp = self._model.deferred_req()
finally:
self.clear_flash_message()
next_scene = self._model.result_view_name or 'Result'
self.go_to_next_scene(next_scene)

30
ceo/tui/ErrorView.py Normal file
View File

@ -0,0 +1,30 @@
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Layout, Label
from .CeoFrame import CeoFrame
class ErrorView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Error',
on_load=self._errorview_on_load, title='Error',
has_dynamic_layouts=True,
)
def _errorview_on_load(self):
layout = Layout([1, 10], fill_frame=True)
self.add_layout(layout)
for _ in range(2):
layout.add_widget(Label(''), 1)
layout.add_widget(Label('An error occurred:'), 1)
layout.add_widget(Label(''), 1)
for line in self._model.error_message.splitlines():
layout.add_widget(Label(line), 1)
self.add_buttons(on_next_excl=self._next)
self.fix()
def _next(self):
self._model.reset()
raise NextScene('Welcome')

86
ceo/tui/Model.py Normal file
View File

@ -0,0 +1,86 @@
from copy import deepcopy
class Model:
"""A convenient place to store View data persistently."""
def __init__(self):
self.screen = None
self.views = []
self.title = None
self.scene_stack = []
self.result_view_name = None
self.error_message = None
# view-specific data, to be used when e.g. resizing the window
self._initial_viewdata = {
'AddUser': {
'uid': '',
'cn': '',
'program': '',
'forwarding_address': '',
'num_terms': '1',
},
'RenewUser': {
'uid': '',
'num_terms': '1',
},
'Transaction': {
'op_layout': None,
'msg_layout': None,
'labels': {},
'status': 'not started',
},
'GetUser': {
'uid': '',
},
'ResetPassword': {
'uid': '',
},
'ChangeLoginShell': {
'uid': '',
'login_shell': '',
},
'SetForwardingAddresses': {
'uid': '',
'forwarding_addresses': [''],
},
'AddGroup': {
'cn': '',
'description': '',
},
'GetGroup': {
'cn': '',
},
'AddMemberToGroup': {
'cn': '',
'uid': '',
'subscribe': True,
},
'RemoveMemberFromGroup': {
'cn': '',
'uid': '',
'unsubscribe': True,
},
}
self.viewdata = deepcopy(self._initial_viewdata)
# data which is shared between multiple views
self.is_club_rep = False
self.confirm_lines = None
self.operations = None
self.deferred_req = None
self.resp = None
def reset(self):
self.viewdata = deepcopy(self._initial_viewdata)
self.is_club_rep = False
self.confirm_lines = None
self.operations = None
self.deferred_req = None
self.resp = None
self.title = None
self.error_message = None
self.scene_stack.clear()
self.result_view_name = None
for view in self.views:
if hasattr(view, '_ceoframe_on_reset'):
view._ceoframe_on_reset()

63
ceo/tui/ResultView.py Normal file
View File

@ -0,0 +1,63 @@
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Layout, Label
import requests
from .CeoFrame import CeoFrame
class ResultView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Result',
on_load=self._resultview_on_load, title='Result',
has_dynamic_layouts=True,
escape_on_q=True,
)
# TODO: deduplicate this from ConfirmView
def _add_text(self, text: str = '\n', center: bool = False):
if center:
layout = Layout([100])
align = '^'
col = 0
else:
layout = Layout([1, 10])
align = '<'
col = 1
self.add_layout(layout)
for line in text.splitlines():
layout.add_widget(Label(line, align=align), col)
def _add_pair(self, key: str, val: str):
layout = Layout([10, 1, 10])
self.add_layout(layout)
if key:
layout.add_widget(Label(key + ':', align='>'), 0)
else:
layout.add_widget(Label(''), 0)
layout.add_widget(Label(val, align='<'), 2)
# override this method in child classes if desired
def show_result(self, resp: requests.Response):
self._add_text('The operation was successfully performed.', center=True)
def _resultview_on_load(self):
self._add_text()
resp = self._model.resp
if resp.status_code != 200:
self._add_text('An error occurred:')
if resp.headers.get('content-type') == 'application/json':
err_msg = resp.json()['error']
else:
err_msg = resp.text.rstrip()
self._add_text(err_msg)
else:
self.show_result(resp)
# fill the rest of the space
self.add_layout(Layout([100], fill_frame=True))
self.add_buttons(on_next_excl=self._next)
self.fix()
def _next(self):
self._model.reset()
raise NextScene('Welcome')

View File

@ -0,0 +1,97 @@
from typing import Dict, Union
from asciimatics.widgets import Label, Layout
import requests
from .Model import Model
from ..StreamResponseHandler import StreamResponseHandler
class TUIStreamResponseHandler(StreamResponseHandler):
def __init__(
self,
model: Model,
labels: Dict[str, Label],
msg_layout: Layout,
txn_view, # TransactionView
):
super().__init__()
self.screen = model.screen
self.operations = model.operations
self.idx = 0
self.labels = labels
self.msg_layout = msg_layout
self.txn_view = txn_view
self.error_messages = []
def _update(self):
# Since we're running in a separate thread, we need to force the
# screen to update. See
# https://github.com/peterbrittain/asciimatics/issues/56
self.txn_view.fix()
self.screen.force_update()
def _show_msg(self, msg: str = '\n'):
for line in msg.splitlines():
self.msg_layout.add_widget(Label(line, align='^'))
def _abort(self):
for operation in self.operations[self.idx:]:
self.labels[operation].text = 'ABORTED'
self.txn_view.enable_next_btn()
def handle_non_200(self, resp: requests.Response):
self._abort()
self._show_msg('An error occurred:')
if resp.headers.get('content-type') == 'application/json':
err_msg = resp.json()['error']
else:
err_msg = resp.text
self._show_msg(err_msg)
self._update()
def begin(self):
pass
def handle_aborted(self, err_msg: str):
self._abort()
self._show_msg('The transaction was rolled back.')
self._show_msg('The error was:\n')
self._show_msg(err_msg)
self._show_msg()
self._show_msg('Please check the ceod logs.')
self._update()
def handle_completed(self):
self._show_msg('Transaction successfully completed.')
if len(self.error_messages) > 0:
self._show_msg('There were some errors:')
for msg in self.error_messages:
self._show_msg(msg)
self.txn_view.enable_next_btn()
self._update()
def handle_successful_operation(self):
operation = self.operations[self.idx]
self.labels[operation].text = 'Done'
self.idx += 1
self._update()
def handle_failed_operation(self, err_msg: Union[str, None]):
operation = self.operations[self.idx]
self.labels[operation].text = 'Failed'
if err_msg is not None:
self.error_messages.append(err_msg)
self.idx += 1
self._update()
def handle_skipped_operation(self):
operation = self.operations[self.idx]
self.labels[operation].text = 'Skipped'
self.idx += 1
self._update()
def handle_unrecognized_operation(self, operation: str):
self.error_messages.append('Unrecognized operation: ' + operation)
self.idx += 1
self._update()

102
ceo/tui/TransactionView.py Normal file
View File

@ -0,0 +1,102 @@
from threading import Thread
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Layout, Button, Divider, Label
from ..operation_strings import descriptions as op_desc
from ..utils import generic_handle_stream_response
from .CeoFrame import CeoFrame
from .TUIStreamResponseHandler import TUIStreamResponseHandler
class TransactionView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Transaction',
on_load=self._txnview_on_load, title='Running Transaction',
has_dynamic_layouts=True,
)
self._model = model
# map operation names to label widgets
self._labels = model.viewdata['Transaction']['labels']
def _add_buttons(self):
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Divider())
layout = Layout([1, 1])
self.add_layout(layout)
self._next_btn = Button('Next', self._next)
# we don't want to disable the button if the txn completed
# and the user just resized the window
if self._model.viewdata['Transaction']['status'] != 'completed':
self._next_btn.disabled = True
layout.add_widget(self._next_btn, 1)
def _add_blank_line(self):
self._op_layout.add_widget(Label(''), 0)
self._op_layout.add_widget(Label(''), 2)
def _txnview_on_load(self):
d = self._model.viewdata['Transaction']
if d['op_layout'] is None:
first_time = True
self._op_layout = Layout([12, 1, 10])
self.add_layout(self._op_layout)
# store the layouts so that we can re-use them when the screen
# gets resized
d['op_layout'] = self._op_layout
for _ in range(2):
self._add_blank_line()
for operation in self._model.operations:
desc = op_desc[operation]
self._op_layout.add_widget(Label(desc + '...', align='>'), 0)
desc_label = Label('', align='<')
self._op_layout.add_widget(desc_label, 2)
self._labels[operation] = desc_label
self._add_blank_line()
# this is the where success/failure messages etc. get placed
self._msg_layout = Layout([100])
self.add_layout(self._msg_layout)
d['msg_layout'] = self._msg_layout
else:
# we arrive here when the screen has been resized
first_time = False
# restore the layouts which we saved
self._op_layout = d['op_layout']
self.add_layout(self._op_layout)
self._msg_layout = d['msg_layout']
self.add_layout(self._msg_layout)
# fill up the rest of the space
self.add_layout(Layout([100], fill_frame=True))
self._add_buttons()
self.fix()
# only send the API request the first time we arrive at this
# scene, not when the screen gets resized
if first_time:
Thread(target=self._do_txn).start()
def _do_txn(self):
self._model.viewdata['Transaction']['status'] = 'in progress'
resp = self._model.deferred_req()
handler = TUIStreamResponseHandler(
model=self._model,
labels=self._labels,
msg_layout=self._msg_layout,
txn_view=self,
)
generic_handle_stream_response(resp, self._model.operations, handler)
def enable_next_btn(self):
self._next_btn.disabled = False
# If we don't reset, the button isn't selectable, even though we
# enabled it
self.reset()
# save the fact that the transaction is completed
self._model.viewdata['Transaction']['status'] = 'completed'
def _next(self):
self._model.reset()
raise NextScene('Welcome')

111
ceo/tui/WelcomeView.py Normal file
View File

@ -0,0 +1,111 @@
import functools
from asciimatics.widgets import ListBox, Layout, Divider, Button, Label
from asciimatics.exceptions import NextScene, StopApplication
from .CeoFrame import CeoFrame
class WelcomeView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Welcome',
title='CSC Electronic Office',
escape_on_q=True,
)
members_menu_items = [
('Add member', 'AddUser'),
('Add club rep', 'AddUser'),
('Renew member', 'RenewUser'),
('Renew club rep', 'RenewUser'),
('Get user info', 'GetUser'),
('Reset password', 'ResetPassword'),
('Change login shell', 'ChangeLoginShell'),
('Set forwarding addresses', 'SetForwardingAddresses'),
]
members_menu = self._create_menu(
members_menu_items, 'members', self._members_menu_select)
groups_menu_items = [
('Add group', 'AddGroup'),
('Get group members', 'GetGroup'),
('Add member to group', 'AddMemberToGroup'),
('Remove member from group', 'RemoveMemberFromGroup'),
]
groups_menu = self._create_menu(groups_menu_items, 'groups')
db_menu_items = [
('Create MySQL database', 'CreateMySQL'),
('Reset MySQL password', 'ResetMySQLPassword'),
('Create PostgreSQL database', 'CreatePostgreSQL'),
('Reset PostgreSQL password', 'ResetPostgreSQLPassword'),
]
db_menu = self._create_menu(
db_menu_items, 'databases', self._db_menu_select)
positions_menu_items = [
('Get positions', 'GetPositions'),
('Set positions', 'SetPositions'),
]
positions_menu = self._create_menu(positions_menu_items, 'positions')
self._menu_groups = {
'members': members_menu_items,
'groups': groups_menu_items,
'databases': db_menu_items,
'positions': positions_menu_items,
}
layout = Layout([1, 4, 1], fill_frame=True)
self.add_layout(layout)
layout.add_widget(members_menu, 1)
layout.add_widget(groups_menu, 1)
layout.add_widget(db_menu, 1)
layout.add_widget(positions_menu, 1)
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Label('Press <TAB> to switch widgets'))
layout.add_widget(Divider())
layout = Layout([1, 1, 1])
self.add_layout(layout)
layout.add_widget(Button("Quit", self._quit), 2)
self.fix()
def _create_menu(self, menu_items, name, on_select=None):
if on_select is None:
on_select = functools.partial(self._generic_menu_select, name)
return ListBox(
len(menu_items),
[
(desc, i) for i, (desc, view) in
enumerate(menu_items)
],
name=name,
label=name.capitalize(),
on_select=on_select,
)
def _get_menu_item_desc_view(self, menu_name: str):
self.save()
item_id = self.data[menu_name]
menu_items = self._menu_groups[menu_name]
return menu_items[item_id]
def _members_menu_select(self):
desc, view = self._get_menu_item_desc_view('members')
if desc.endswith('club rep'):
self._model.is_club_rep = True
self._welcomeview_go_to_next_scene(desc, view)
def _db_menu_select(self):
pass
def _generic_menu_select(self, menu_name):
desc, view = self._get_menu_item_desc_view('groups')
self._welcomeview_go_to_next_scene(desc, view)
def _welcomeview_go_to_next_scene(self, desc, view):
self._model.title = desc
self._model.scene_stack.append('Welcome')
raise NextScene(view)
@staticmethod
def _quit():
raise StopApplication("User pressed quit")

0
ceo/tui/__init__.py Normal file
View File

View File

@ -0,0 +1,42 @@
from asciimatics.widgets import Layout, Text
from ...utils import defer, http_post
from ..CeoFrame import CeoFrame
from ceod.transactions.groups import AddGroupTransaction
class AddGroupView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'AddGroup',
save_data=True,
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._cn = Text('Name:', 'cn')
layout.add_widget(self._cn)
self._description = Text('Description:', 'description')
layout.add_widget(self._description)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _next(self):
cn = self._cn.value
description = self._description.value
body = {
'cn': cn,
'description': description,
}
self._model.confirm_lines = [
'The following group will be created:',
'',
('cn', cn),
('description', description),
'',
'Are you sure you want to continue?',
]
self._model.deferred_req = defer(http_post, '/api/groups', json=body)
self._model.operations = AddGroupTransaction.operations

View File

@ -0,0 +1,44 @@
from asciimatics.widgets import Layout, Text, CheckBox, Label
from ...utils import defer, http_post
from ..CeoFrame import CeoFrame
from ceod.transactions.groups import AddMemberToGroupTransaction
class AddMemberToGroupView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'AddMemberToGroup',
save_data=True,
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._cn = Text('Group name:', 'cn')
layout.add_widget(self._cn)
self._username = Text('Username:', 'uid')
layout.add_widget(self._username)
layout.add_widget(Label(''))
self._checkbox = CheckBox(
'subscribe to auxiliary mailing lists', name='subscribe')
self._checkbox.value = True
layout.add_widget(self._checkbox)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _next(self):
cn = self._cn.value
uid = self._username.value
self._model.confirm_lines = [
f'Are you sure you want to add {uid} to {cn}?',
]
operations = AddMemberToGroupTransaction.operations
url = f'/api/groups/{cn}/members/{uid}'
# TODO: deduplicate this logic from the CLI
if not self._checkbox.value:
url += '?subscribe_to_lists=false'
operations.remove('subscribe_user_to_auxiliary_mailing_lists')
self._model.deferred_req = defer(http_post, url)
self._model.operations = operations

View File

@ -0,0 +1,18 @@
import requests
from ..ResultView import ResultView
class GetGroupResultView(ResultView):
def show_result(self, resp: requests.Response):
d = resp.json()
if 'description' in d:
desc = d['description'] + ' (' + d['cn'] + ')'
else:
desc = d['cn