Rewrite TUI (#52)
Closes #44. Closes #47. Closes #49. Closes #50. The TUI has been rewritten using urwid instead of asciimatics. The MVC pattern was also used to help increase organization and readability. The mouse has been disabled, which allows users to easily copy text from the terminal. Terms are now sorted when displayed, for both the CLI and the TUI. Reviewed-on: #52pull/53/head
parent
19496b4568
commit
b543f0eb0c
@ -1,160 +0,0 @@ |
||||
from asciimatics.event import KeyboardEvent |
||||
from asciimatics.exceptions import NextScene, StopApplication |
||||
from asciimatics.screen import Screen |
||||
from asciimatics.widgets import Frame, Layout, Divider, Button, Label, \ |
||||
PopUpDialog |
||||
|
||||
|
||||
class CeoFrame(Frame): |
||||
def __init__( |
||||
self, |
||||
screen, |
||||
height, |
||||
width, |
||||
model, |
||||
name, |
||||
on_load=None, |
||||
title=None, |
||||
escape_on_q=False, # whether to quit when 'q' is pressed |
||||
): |
||||
super().__init__( |
||||
screen, |
||||
height, |
||||
width, |
||||
name=name, |
||||
can_scroll=False, |
||||
title=title, |
||||
on_load=self._ceoframe_on_load, |
||||
) |
||||
self._custom_on_load = on_load |
||||
self._model = model |
||||
self._name = name |
||||
# If a view has a custom on_load function, all layouts should |
||||
# be created on load (*not* in the constructor) |
||||
self._has_dynamic_layouts = on_load is not None |
||||
self._quit_keys = [Screen.KEY_ESCAPE] |
||||
if escape_on_q: |
||||
self._quit_keys.append(ord('q')) |
||||
# child classes may override this as a last resort |
||||
self.skip_reload = False |
||||
|
||||
def _ceoframe_on_load(self): |
||||
if self.skip_reload: |
||||
self.skip_reload = False |
||||
return |
||||
if self._model.title is not None: |
||||
self.title = self._model.title |
||||
self._model.title = None |
||||
if self._has_dynamic_layouts and self._model.nav_direction == 'forward': |
||||
# We arrive here after a user pressed 'Back' then 'Next', |
||||
# or after we returned to the Welcome screen. |
||||
# The data may have changed, so we need to redraw everything, |
||||
# via self._custom_on_load(). |
||||
self.clear_layouts() |
||||
self._custom_on_load() |
||||
|
||||
# may be overridden by child classes |
||||
def _ceoframe_on_reset(self): |
||||
""" |
||||
This is called whenever we return to the home screen |
||||
after some kind of operation was completed. |
||||
This is called from Model.reset(). |
||||
""" |
||||
pass |
||||
|
||||
def clear_layouts(self): |
||||
self._layouts.clear() |
||||
|
||||
def force_update(self): |
||||
""" |
||||
This should be called by background threads after they make changes |
||||
to the UI. |
||||
""" |
||||
# Since we're running in a separate thread, we need to force the |
||||
# screen to update. See |
||||
# https://github.com/peterbrittain/asciimatics/issues/56 |
||||
self.fix() |
||||
self._model.screen.force_update() |
||||
|
||||
def add_buttons( |
||||
self, back_btn=False, back_btn_text='Back', |
||||
next_scene=None, next_btn_text='Next', on_next=None, |
||||
on_next_excl=None, |
||||
): |
||||
""" |
||||
Add a new layout at the bottom of the frame with buttons. |
||||
If back_btn is True, a Back button is added. |
||||
If next_scene is set to the name of the next scene, or on_next_excl |
||||
is set, a Next button will be added. |
||||
If on_next is set to a function, it will be called when the Next |
||||
button is pressed, and the screen will switch to the next scene. |
||||
If on_next_excl is set to a function, it will be called when the Next |
||||
button is pressed, and the scene will not be switched. |
||||
If both on_next and on_next_excl are set, on_next will be ignored. |
||||
""" |
||||
layout = Layout([100]) |
||||
self.add_layout(layout) |
||||
layout.add_widget(Divider()) |
||||
|
||||
def _back(): |
||||
self._model.nav_direction = 'backward' |
||||
last_scene = self._model.scene_stack.pop() |
||||
if last_scene == 'Welcome': |
||||
self._model.reset() |
||||
raise NextScene(last_scene) |
||||
|
||||
def _next(): |
||||
self._model.nav_direction = 'forward' |
||||
if on_next_excl is not None: |
||||
on_next_excl() |
||||
return |
||||
if on_next is not None: |
||||
on_next() |
||||
self.go_to_next_scene(next_scene) |
||||
|
||||
layout = Layout([1, 1]) |
||||
self.add_layout(layout) |
||||
if back_btn: |
||||
layout.add_widget(Button(back_btn_text, _back), 0) |
||||
if next_scene is not None or on_next_excl is not None: |
||||
layout.add_widget(Button(next_btn_text, _next), 1) |
||||
|
||||
def go_to_next_scene(self, next_scene: str): |
||||
self._model.scene_stack.append(self._name) |
||||
raise NextScene(next_scene) |
||||
|
||||
def add_flash_message_layout(self): |
||||
layout = Layout([100]) |
||||
self.add_layout(layout) |
||||
self._status_label = Label('') |
||||
layout.add_widget(self._status_label) |
||||
|
||||
def flash_message(self, msg: str, force_update: bool = False): |
||||
self._status_label.text = msg |
||||
if force_update: |
||||
self._model.screen.force_update() |
||||
self._model.screen.draw_next_frame() |
||||
|
||||
def clear_flash_message(self, force_update: bool = False): |
||||
self.flash_message('', force_update) |
||||
|
||||
def process_event(self, event): |
||||
if not isinstance(event, KeyboardEvent): |
||||
return super().process_event(event) |
||||
c = event.key_code |
||||
# Stop on 'q' or 'Esc' |
||||
if c in self._quit_keys: |
||||
self._scene.add_effect(PopUpDialog( |
||||
self.screen, |
||||
'Are you sure you want to quit?', |
||||
['Yes', 'No'], |
||||
has_shadow=True, |
||||
on_close=self._quit_on_yes, |
||||
)) |
||||
return super().process_event(event) |
||||
|
||||
@staticmethod |
||||
def _quit_on_yes(selected): |
||||
# Yes is the first button |
||||
if selected == 0: |
||||
raise StopApplication("User terminated app") |
@ -1,61 +0,0 @@ |
||||
from asciimatics.widgets import Layout, Label |
||||
|
||||
from .CeoFrame import CeoFrame |
||||
|
||||
|
||||
class ConfirmView(CeoFrame): |
||||
def __init__(self, screen, width, height, model): |
||||
super().__init__( |
||||
screen, height, width, model, 'Confirm', |
||||
on_load=self._confirmview_on_load, title='Confirmation', |
||||
escape_on_q=True, |
||||
) |
||||
|
||||
def _add_line(self, text: str = ''): |
||||
layout = Layout([100]) |
||||
self.add_layout(layout) |
||||
layout.add_widget(Label(text, align='^')) |
||||
|
||||
def _add_pair(self, key: str, val: str): |
||||
layout = Layout([10, 1, 10]) |
||||
self.add_layout(layout) |
||||
layout.add_widget(Label(key + ':', align='>'), 0) |
||||
layout.add_widget(Label(val, align='<'), 2) |
||||
|
||||
def _confirmview_on_load(self): |
||||
for _ in range(2): |
||||
self._add_line() |
||||
for line in self._model.confirm_lines: |
||||
if isinstance(line, str): |
||||
self._add_line(line) |
||||
else: |
||||
# assume tuple |
||||
key, val = line |
||||
self._add_pair(key, val) |
||||
# fill the rest of the space |
||||
self.add_layout(Layout([100], fill_frame=True)) |
||||
|
||||
kwargs = { |
||||
'back_btn': True, 'back_btn_text': 'No', 'next_btn_text': 'Yes', |
||||
} |
||||
if self._model.operations is not None: |
||||
kwargs['next_scene'] = self._model.txn_view_name or 'Transaction' |
||||
else: |
||||
self.add_flash_message_layout() |
||||
kwargs['on_next_excl'] = self._next |
||||
self.add_buttons(**kwargs) |
||||
self.fix() |
||||
# OK so there's some weird bug somewhere which causes the buttons to be unselectable |
||||
# if we add a new user, return to the Welcome screen, then try to renew a user. |
||||
# This is a workaround for that. |
||||
self.skip_reload = True |
||||
self.reset() |
||||
|
||||
def _next(self): |
||||
self.flash_message('Sending request...', force_update=True) |
||||
try: |
||||
self._model.resp = self._model.deferred_req() |
||||
finally: |
||||
self.clear_flash_message() |
||||
next_scene = self._model.result_view_name or 'Result' |
||||
self.go_to_next_scene(next_scene) |
@ -1,29 +0,0 @@ |
||||
from asciimatics.exceptions import NextScene |
||||
from asciimatics.widgets import Layout, Label |
||||
|
||||
from .CeoFrame import CeoFrame |
||||
|
||||
|
||||
class ErrorView(CeoFrame): |
||||
def __init__(self, screen, width, height, model): |
||||
super().__init__( |
||||
screen, height, width, model, 'Error', |
||||
on_load=self._errorview_on_load, title='Error', |
||||
) |
||||
|
||||
def _errorview_on_load(self): |
||||
layout = Layout([1, 10], fill_frame=True) |
||||
self.add_layout(layout) |
||||
for _ in range(2): |
||||
layout.add_widget(Label(''), 1) |
||||
layout.add_widget(Label('An error occurred:'), 1) |
||||
layout.add_widget(Label(''), 1) |
||||
for line in self._model.error_message.splitlines(): |
||||
layout.add_widget(Label(line), 1) |
||||
|
||||
self.add_buttons(on_next_excl=self._next) |
||||
self.fix() |
||||
|
||||
def _next(self): |
||||
self._model.reset() |
||||
raise NextScene('Welcome') |
@ -1,56 +0,0 @@ |
||||
from copy import deepcopy |
||||
|
||||
from zope import component |
||||
|
||||
from ceo_common.interfaces import IConfig |
||||
|
||||
|
||||
class Model: |
||||
"""A convenient place to store View data persistently.""" |
||||
|
||||
def __init__(self): |
||||
cfg = component.getUtility(IConfig) |
||||
|
||||
self.screen = None |
||||
self.views = [] |
||||
self.title = None |
||||
self.scene_stack = [] |
||||
self.result_view_name = None |
||||
self.txn_view_name = None |
||||
self.error_message = None |
||||
self.nav_direction = 'forward' |
||||
# View-specific data |
||||
self._initial_viewdata = { |
||||
'ResetPassword': { |
||||
'uid': '', |
||||
}, |
||||
} |
||||
for pos in cfg.get('positions_available'): |
||||
self._initial_viewdata[pos] = '' |
||||
self.viewdata = deepcopy(self._initial_viewdata) |
||||
# data which is shared between multiple views |
||||
self.is_club_rep = False |
||||
self.confirm_lines = None |
||||
self.operations = None |
||||
self.deferred_req = None |
||||
self.resp = None |
||||
self.db_type = None |
||||
self.user_dict = None |
||||
|
||||
def reset(self): |
||||
self.viewdata = deepcopy(self._initial_viewdata) |
||||
self.is_club_rep = False |
||||
self.confirm_lines = None |
||||
self.operations = None |
||||
self.deferred_req = None |
||||
self.resp = None |
||||
self.db_type = None |
||||
self.user_dict = None |
||||
self.title = None |
||||
self.error_message = None |
||||
self.scene_stack.clear() |
||||
self.result_view_name = None |
||||
self.txn_view_name = None |
||||
for view in self.views: |
||||
if hasattr(view, '_ceoframe_on_reset'): |
||||
view._ceoframe_on_reset() |
@ -1,62 +0,0 @@ |
||||
from asciimatics.exceptions import NextScene |
||||
from asciimatics.widgets import Layout, Label |
||||
import requests |
||||
|
||||
from .CeoFrame import CeoFrame |
||||
|
||||
|
||||
class ResultView(CeoFrame): |
||||
def __init__(self, screen, width, height, model): |
||||
super().__init__( |
||||
screen, height, width, model, 'Result', |
||||
on_load=self._resultview_on_load, title='Result', |
||||
escape_on_q=True, |
||||
) |
||||
|
||||
# TODO: deduplicate this from ConfirmView |
||||
def _add_text(self, text: str = '\n', center: bool = False): |
||||
if center: |
||||
layout = Layout([100]) |
||||
align = '^' |
||||
col = 0 |
||||
else: |
||||
layout = Layout([1, 10]) |
||||
align = '<' |
||||
col = 1 |
||||
self.add_layout(layout) |
||||
for line in text.splitlines(): |
||||
layout.add_widget(Label(line, align=align), col) |
||||
|
||||
def _add_pair(self, key: str, val: str): |
||||
layout = Layout([10, 1, 10]) |
||||
self.add_layout(layout) |
||||
if key: |
||||
layout.add_widget(Label(key + ':', align='>'), 0) |
||||
else: |
||||
layout.add_widget(Label(''), 0) |
||||
layout.add_widget(Label(val, align='<'), 2) |
||||
|
||||
# override this method in child classes if desired |
||||
def show_result(self, resp: requests.Response): |
||||
self._add_text('The operation was successfully performed.', center=True) |
||||
|
||||
def _resultview_on_load(self): |
||||
self._add_text() |
||||
resp = self._model.resp |
||||
if not resp.ok: |
||||
self._add_text('An error occurred:') |
||||
if resp.headers.get('content-type') == 'application/json': |
||||
err_msg = resp.json()['error'] |
||||
else: |
||||
err_msg = resp.text.rstrip() |
||||
self._add_text(err_msg) |
||||
else: |
||||
self.show_result(resp) |
||||
# fill the rest of the space |
||||
self.add_layout(Layout([100], fill_frame=True)) |
||||
self.add_buttons(on_next_excl=self._next) |
||||
self.fix() |
||||
|
||||
def _next(self): |
||||
self._model.reset() |
||||
raise NextScene('Welcome') |
@ -1,93 +0,0 @@ |
||||
from typing import Dict, Union |
||||
|
||||
from asciimatics.widgets import Label, Layout |
||||
import requests |
||||
|
||||
from .Model import Model |
||||
from ..StreamResponseHandler import StreamResponseHandler |
||||
|
||||
|
||||
class TUIStreamResponseHandler(StreamResponseHandler): |
||||
def __init__( |
||||
self, |
||||
model: Model, |
||||
labels: Dict[str, Label], |
||||
msg_layout: Layout, |
||||
txn_view, # TransactionView |
||||
): |
||||
super().__init__() |
||||
self.screen = model.screen |
||||
self.operations = model.operations |
||||
self.idx = 0 |
||||
self.labels = labels |
||||
self.msg_layout = msg_layout |
||||
self.txn_view = txn_view |
||||
self.error_messages = [] |
||||
|
||||
def _update(self): |
||||
self.txn_view.force_update() |
||||
|
||||
def _show_msg(self, msg: str = '\n'): |
||||
for line in msg.splitlines(): |
||||
self.msg_layout.add_widget(Label(line, align='^')) |
||||
|
||||
def _abort(self): |
||||
for operation in self.operations[self.idx:]: |
||||
self.labels[operation].text = 'ABORTED' |
||||
self.txn_view.enable_next_btn() |
||||
|
||||
def handle_non_200(self, resp: requests.Response): |
||||
self._abort() |
||||
self._show_msg('An error occurred:') |
||||
if resp.headers.get('content-type') == 'application/json': |
||||
err_msg = resp.json()['error'] |
||||
else: |
||||
err_msg = resp.text |
||||
self._show_msg(err_msg) |
||||
self._update() |
||||
|
||||
def begin(self): |
||||
pass |
||||
|
||||
def handle_aborted(self, err_msg: str): |
||||
self._abort() |
||||
self._show_msg('The transaction was rolled back.') |
||||
self._show_msg('The error was:\n') |
||||
self._show_msg(err_msg) |
||||
self._show_msg() |
||||
self._show_msg('Please check the ceod logs.') |
||||
self._update() |
||||
|
||||
def handle_completed(self): |
||||
self._show_msg('Transaction successfully completed.') |
||||
if len(self.error_messages) > 0: |
||||
self._show_msg('There were some errors:') |
||||
for msg in self.error_messages: |
||||
self._show_msg(msg) |
||||
self.txn_view.enable_next_btn() |
||||
self._update() |
||||
|
||||
def handle_successful_operation(self): |
||||
operation = self.operations[self.idx] |
||||
self.labels[operation].text = 'Done' |
||||
self.idx += 1 |
||||
self._update() |
||||
|
||||
def handle_failed_operation(self, err_msg: Union[str, None]): |
||||
operation = self.operations[self.idx] |
||||
self.labels[operation].text = 'Failed' |
||||
if err_msg is not None: |
||||
self.error_messages.append(err_msg) |
||||
self.idx += 1 |
||||
self._update() |
||||
|
||||
def handle_skipped_operation(self): |
||||
operation = self.operations[self.idx] |
||||
self.labels[operation].text = 'Skipped' |
||||
self.idx += 1 |
||||
self._update() |
||||
|
||||
def handle_unrecognized_operation(self, operation: str): |
||||
self.error_messages.append('Unrecognized operation: ' + operation) |
||||
self.idx += 1 |
||||
self._update() |
@ -1,85 +0,0 @@ |
||||
from threading import Thread |
||||
from typing import List, Dict |
||||
|
||||
from asciimatics.exceptions import NextScene |
||||
from asciimatics.widgets import Layout, Button, Divider, Label |
||||
|
||||
from ..operation_strings import descriptions as op_desc |
||||
from ..utils import generic_handle_stream_response |
||||
from .CeoFrame import CeoFrame |
||||
from .TUIStreamResponseHandler import TUIStreamResponseHandler |
||||
|
||||
|
||||
class TransactionView(CeoFrame): |
||||
def __init__(self, screen, width, height, model): |
||||
super().__init__( |
||||
screen, height, width, model, 'Transaction', |
||||
on_load=self._txnview_on_load, title='Running Transaction', |
||||
) |
||||
# map operation names to label widgets |
||||
self._labels = {} |
||||
|
||||
def _add_buttons(self): |
||||
layout = Layout([100]) |
||||
self.add_layout(layout) |
||||
layout.add_widget(Divider()) |
||||
|
||||
layout = Layout([1, 1]) |
||||
self.add_layout(layout) |
||||
self._next_btn = Button('Next', self._next) |
||||
layout.add_widget(self._next_btn, 1) |
||||
|
||||
def _add_blank_line(self): |
||||
self._op_layout.add_widget(Label(''), 0) |
||||
self._op_layout.add_widget(Label(''), 2) |
||||
|
||||
def _txnview_on_load(self): |
||||
self._op_layout = Layout([12, 1, 10]) |
||||
self.add_layout(self._op_layout) |
||||
# store the layouts so that we can re-use them when the screen |
||||
# gets resized |
||||
for _ in range(2): |
||||
self._add_blank_line() |
||||
for operation in self._model.operations: |
||||
desc = op_desc[operation] |
||||
self._op_layout.add_widget(Label(desc + '...', align='>'), 0) |
||||
desc_label = Label('', align='<') |
||||
self._op_layout.add_widget(desc_label, 2) |
||||
self._labels[operation] = desc_label |
||||
self._add_blank_line() |
||||
# this is the where success/failure messages etc. get placed |
||||
self._msg_layout = Layout([100]) |
||||
self.add_layout(self._msg_layout) |
||||
# fill up the rest of the space |
||||
self.add_layout(Layout([100], fill_frame=True)) |
||||
|
||||
self._add_buttons() |
||||
self.fix() |
||||
Thread(target=self._do_txn).start() |
||||
|
||||
def _do_txn(self): |
||||
resp = self._model.deferred_req() |
||||
handler = TUIStreamResponseHandler( |
||||
model=self._model, |
||||
labels=self._labels, |
||||
msg_layout=self._msg_layout, |
||||
txn_view=self, |
||||
) |
||||
data = generic_handle_stream_response(resp, self._model.operations, handler) |
||||
self.write_extra_txn_info(data) |
||||
|
||||
# to be overridden in child classes if desired |
||||
def write_extra_txn_info(self, data: List[Dict]): |
||||
pass |
||||
|
||||
def enable_next_btn(self): |
||||
self._next_btn.disabled = False |
||||
# If we don't reset, the button isn't selectable, even though we |
||||
# enabled it. |
||||
# We don't want to reload, though (which reset() will trigger). |
||||
self.skip_reload = True |
||||
self.reset() |
||||
|
||||
def _next(self): |
||||
self._model.reset() |
||||
raise NextScene('Welcome') |
@ -1,107 +0,0 @@ |
||||
from asciimatics.widgets import ListBox, Layout, Divider, Button, Label |
||||
from asciimatics.exceptions import NextScene, StopApplication |
||||
|
||||
from .CeoFrame import CeoFrame |
||||
|
||||
|
||||
class WelcomeView(CeoFrame): |
||||
def __init__(self, screen, width, height, model): |
||||
super().__init__( |
||||
screen, height, width, model, 'Welcome', |
||||
title='CSC Electronic Office', |
||||
escape_on_q=True, |
||||
) |
||||
members_menu_items = [ |
||||
('Add member', 'AddUser'), |
||||
('Add club rep', 'AddUser'), |
||||
('Renew member', 'RenewUser'), |
||||
('Renew club rep', 'RenewUser'), |
||||
('Get user info', 'GetUser'), |
||||
('Reset password', 'ResetPassword'), |
||||
('Change login shell', 'ChangeLoginShell'), |
||||
('Set forwarding addresses', 'SetForwardingAddresses'), |
||||
] |
||||
groups_menu_items = [ |
||||
('Add group', 'AddGroup'), |
||||
('Get group members', 'GetGroup'), |
||||
('Add member to group', 'AddMemberToGroup'), |
||||
('Remove member from group', 'RemoveMemberFromGroup'), |
||||
] |
||||
db_menu_items = [ |
||||
('Create MySQL database', 'CreateDatabase'), |
||||
('Reset MySQL password', 'ResetDatabasePassword'), |
||||
('Create PostgreSQL database', 'CreateDatabase'), |
||||
('Reset PostgreSQL password', 'ResetDatabasePassword'), |
||||
] |
||||
positions_menu_items = [ |
||||
('Get positions', 'GetPositions'), |
||||
('Set positions', 'SetPositions'), |
||||
] |
||||
self.menu_items = [ |
||||
('members', members_menu_items), |
||||
('groups', groups_menu_items), |
||||
('databases', db_menu_items), |
||||
('positions', positions_menu_items), |
||||
] |
||||
self.menu_items_dict = dict(self.menu_items) |
||||
flat_menu_items = [item for name, items in self.menu_items for item in items] |
||||
menu = ListBox( |
||||
len(flat_menu_items), |
||||
[ |
||||
(desc, i) for i, (desc, view) in |
||||
enumerate(flat_menu_items) |
||||
], |
||||
name='menu', |
||||
on_select=self._menu_select, |
||||
) |
||||
labels = [] |
||||
for name, items in self.menu_items: |
||||
labels.append(Label(name.capitalize(), align='>')) |
||||
for _ in range(len(items) - 1): |
||||
labels.append(Label('')) |
||||
|
||||
layout = Layout([5, 1, 8], fill_frame=True) |
||||
self.add_layout(layout) |
||||
layout.add_widget(menu, 2) |
||||
for label in labels: |
||||
layout.add_widget(label, 0) |
||||
|
||||
layout = Layout([100]) |
||||
self.add_layout(layout) |
||||
layout.add_widget(Label('Press <TAB> to switch widgets')) |
||||
layout.add_widget(Divider()) |
||||
|
||||
layout = Layout([1, 1, 1]) |
||||
self.add_layout(layout) |
||||
layout.add_widget(Button("Quit", self._quit), 2) |
||||
self.fix() |
||||
|
||||
def _menu_select(self): |
||||
self.save() |
||||
item_id = self.data['menu'] |
||||
# find which submenu the item belongs to |
||||
counter = 0 |
||||
for name, items in self.menu_items: |
||||
if item_id < counter + len(items): |
||||
break |
||||
counter += len(items) |
||||
submenu_idx = item_id - counter |
||||
desc, view = items[submenu_idx] |
||||
if name == 'members': |
||||
if desc.endswith('club rep'): |
||||
self._model.is_club_rep = True |
||||
elif name == 'databases': |
||||
if 'MySQL' in desc: |
||||
self._model.db_type = 'mysql' |
||||
else: |
||||
self._model.db_type = 'postgresql' |
||||
self._welcomeview_go_to_next_scene(desc, view) |
||||
|
||||
def _welcomeview_go_to_next_scene(self, desc, view): |
||||
self._model.title = desc |
||||
self._model.scene_stack.append('Welcome') |
||||
raise NextScene(view) |
||||
|
||||
@staticmethod |
||||
def _quit(): |
||||
raise StopApplication("User pressed quit") |
@ -0,0 +1,30 @@ |
||||
import os |
||||
from queue import SimpleQueue |
||||
|
||||
|
||||
class App: |
||||
REL_WIDTH_PCT = 60 |
||||
REL_HEIGHT_PCT = 70 |
||||
# On a full-screen (1366x768) gnome-terminal window, |
||||
# I had 168 cols and 36 rows |
||||
WIDTH = int(0.6 * 168) |
||||
HEIGHT = int(0.7 * 36) |
||||
|
||||
def __init__(self, loop, main_widget): |
||||
self.loop = loop |
||||
self.main_widget = main_widget |
||||
self.history = [] |
||||
self.queued_pipe_callbacks = SimpleQueue() |
||||
self.pipefd = loop.watch_pipe(self._pipe_callback) |
||||
|
||||
def run_in_main_loop(self, func): |
||||
self.queued_pipe_callbacks.put(func) |
||||
os.write(self.pipefd, b'\x00') |
||||
|
||||
def _pipe_callback(self, data): |
||||
# We need to clear the whole queue because select() |
||||
# will only send one "notification" if there are two |
||||
# consecutive writes |
||||
while not self.queued_pipe_callbacks.empty(): |
||||
self.queued_pipe_callbacks.get()() |
||||
return True |
@ -0,0 +1,36 @@ |
||||
from .Controller import Controller |
||||
from .TransactionController import TransactionController |
||||
from ceo.tui.models import TransactionModel |
||||
from ceo.tui.views import AddGroupConfirmationView, TransactionView |
||||
from ceod.transactions.groups import AddGroupTransaction |
||||
|
||||
|
||||
class AddGroupController(Controller): |
||||
def __init__(self, model, app): |
||||
super().__init__(model, app) |
||||
|
||||
def on_next_button_pressed(self, button): |
||||
try: |
||||
self.model.name = self.get_group_name_from_view() |
||||
self.model.description = self.view.description_edit.edit_text |
||||
if not self.model.description: |
||||
self.view.popup('Description must not be empty') |
||||
raise Controller.InvalidInput() |
||||
except Controller.InvalidInput: |
||||
return |
||||
view = AddGroupConfirmationView(self.model, self, self.app) |
||||
self.switch_to_view(view) |
||||
|
||||
def on_confirmation_button_pressed(self, button): |
||||
body = { |
||||
'cn': self.model.name, |
||||
'description': self.model.description, |
||||
} |
||||
model = TransactionModel( |
||||
AddGroupTransaction.operations, |
||||
'POST', '/api/groups', json=body |
||||
) |
||||
controller = TransactionController(model, self.app) |
||||
view = TransactionView(model, controller, self.app) |
||||
controller.view = view |
||||
self.switch_to_view(view) |
@ -0,0 +1,37 @@ |
||||
from .Controller import Controller |
||||
from ceod.transactions.groups import AddMemberToGroupTransaction |
||||
from .TransactionController import TransactionController |
||||
from ceo.tui.models import TransactionModel |
||||
from ceo.tui.views import AddMemberToGroupConfirmationView, TransactionView |
||||
|
||||
|
||||
class AddMemberToGroupController(Controller): |
||||
def __init__(self, model, app): |
||||
super().__init__(model, app) |
||||
|
||||
def on_list_subscribe_checkbox_change(self, checkbox, new_state): |
||||
self.model.subscribe_to_lists = new_state |
||||
|
||||
def on_next_button_pressed(self, button): |
||||
try: |
||||
self.model.name = self.get_group_name_from_view() |
||||
self.model.username = self.get_username_from_view() |
||||
except Controller.InvalidInput: |
||||
return |
||||
view = AddMemberToGroupConfirmationView(self.model, self, self.app) |
||||
self.switch_to_view(view) |
||||
|
||||
def on_confirmation_button_pressed(self, button): |
||||
cn = self.model.name |
||||
uid = self.model.username |
||||
url = f'/api/groups/{cn}/members/{uid}' |
||||
if not self.model.subscribe_to_lists: |
||||
url += '?subscribe_to_lists=false' |
||||
model = TransactionModel( |
||||
AddMemberToGroupTransaction.operations, |
||||
'POST', url |
||||
) |
||||
controller = TransactionController(model, self.app) |
||||
view = TransactionView(model, controller, self.app) |
||||
controller.view = view |
||||
self.switch_to_view(view) |
@ -0,0 +1,110 @@ |
||||
from threading import Thread |
||||
|
||||
from ...utils import http_get |
||||
from .Controller import Controller |
||||
from .AddUserTransactionController import AddUserTransactionController |
||||
import ceo.term_utils as term_utils |
||||
from ceo.tui.models import TransactionModel |
||||
from ceo.tui.views import AddUserConfirmationView, TransactionView |
||||
from ceod.transactions.members import AddMemberTransaction |
||||
|
||||
|
||||
class AddUserController(Controller): |
||||
def __init__(self, model, app): |
||||
super().__init__(model, app) |
||||
self.right_col_idx = 0 |
||||
self.prev_searched_username = None |
||||
|
||||
def on_confirmation_button_pressed(self, button): |
||||
body = { |
||||
'uid': self.model.username, |
||||
'cn': self.model.full_name, |
||||
'given_name': self.model.first_name, |
||||
'sn': self.model.last_name, |
||||
} |
||||
if self.model.program: |
||||
body['program'] = self.model.program |
||||
if self.model.forwarding_address: |
||||
body['forwarding_addresses'] = [self.model.forwarding_address] |
||||
new_terms = term_utils.get_terms_for_new_user(self.model.num_terms) |
||||
if self.model.membership_type == 'club_rep': |
||||
body['non_member_terms'] = new_terms |
||||
else: |
||||
body['terms'] = new_terms |
||||
|
||||
model = TransactionModel( |
||||
AddMemberTransaction.operations, |
||||
'POST', '/api/members', |
||||
json=body |
||||
) |
||||
controller = AddUserTransactionController(model, self.app) |
||||
view = TransactionView(model, controller, self.app) |
||||
controller.view = view |
||||
self.switch_to_view(view) |
||||
|
||||
def on_next_button_pressed(self, button): |
||||
try: |
||||
username = self.get_username_from_view() |
||||
num_terms = self.get_num_terms_from_view() |
||||
except Controller.InvalidInput: |
||||
return |
||||
full_name = self.view.full_name_edit.edit_text |
||||
# TODO: share validation logic between CLI and TUI |
||||
if not full_name: |
||||
self.view.popup('Full name must not be empty') |
||||
return |
||||
self.model.username = username |
||||
self.model.full_name = full_name |
||||
self.model.first_name = self.view.first_name_edit.edit_text |
||||
self.model.last_name = self.view.last_name_edit.edit_text |
||||
self.model.program = self.view.program_edit.edit_text |
||||
self.model.forwarding_address = self.view.forwarding_address_edit.edit_text |
||||
self.model.num_terms = num_terms |
||||
view = AddUserConfirmationView(self.model, self, self.app) |
||||
self.switch_to_view(view) |
||||
|
||||
def on_membership_type_changed(self, radio_button, new_state, selected_type): |
||||
if new_state: |
||||
self.model.membership_type = selected_type |
||||
|
||||
def on_row_focus_changed(self): |
||||
_, idx = self.view.listwalker.get_focus() |
||||
old_idx = self.right_col_idx |
||||
self.right_col_idx = idx |
||||
# The username field is the third row, so when |
||||
# idx changes from 2 to 3, this means the user |
||||
# moved from the username field to the next field |
||||
if old_idx == 2 and idx == 3: |
||||
Thread( |
||||
target=self._lookup_user, |
||||
args=(self.view.username_edit.edit_text,) |
||||
).start() |
||||
|
||||
def _set_flash_text(self, *args): |
||||
self.view.flash_text.set_text('Looking up user...') |
||||
|
||||
def _clear_flash_text(self): |
||||
self.view.flash_text.set_text('') |
||||
|
||||
def _on_lookup_user_success(self): |
||||
self._clear_flash_text() |
||||
self.view.update_fields() |
||||
|
||||
def _lookup_user(self, username): |
||||
if not username: |
||||
return |
||||
if username == self.prev_searched_username: |
||||
return |
||||
self.prev_searched_username = username |
||||
self.app.run_in_main_loop(self._set_flash_text) |
||||
resp = http_get('/api/uwldap/' + username) |
||||
if not resp.ok: |
||||
self.app.run_in_main_loop(self._clear_flash_text) |
||||
return |
||||
data = resp.json() |
||||
self.model.full_name = data.get('cn', '') |
||||
self.model.first_name = data.get('given_name', '') |
||||
self.model.last_name = data.get('sn', '') |
||||
self.model.program = data.get('program', '') |
||||
self.model.forwarding_address = data.get('mail_local_addresses', [''])[0] |
||||
self.app.run_in_main_loop(self._on_lookup_user_success) |
@ -0,0 +1,38 @@ |
||||
from typing import Dict, List |
||||
|
||||
from ...utils import get_failed_operations |
||||
from .TransactionController import TransactionController |
||||
|
||||
|
||||
class AddUserTransactionController(TransactionController): |
||||
def __init__(self, model, app): |
||||
super().__init__(model, app) |
||||
|
||||
def handle_completed(self): |
||||
# We don't want to write to the message_text yet, but |
||||
# we still need to enable the Next button. |
||||
self.app.run_in_main_loop(self.view.enable_next_button) |
||||
|
||||
def write_extra_txn_info(self, data: List[Dict]): |
||||
if data[-1]['status'] != 'completed': |
||||
return |
||||
result = data[-1]['result'] |
||||
failed_operations = get_failed_operations(data) |
||||
lines = [] |
||||
if failed_operations: |
||||
lines.append('Transaction successfully completed with some errors.') |
||||
else: |
||||
lines.append('Transaction successfully completed.') |
||||
lines.append('') |
||||
lines.append('User password is: ' + result['password']) |
||||
if 'send_welcome_message' in failed_operations: |
||||
lines.extend([ |
||||
'', |
||||
'Since the welcome message was not sent, ' |
||||
'you need to email this password to the user.' |
||||
]) |
||||
|
||||
def target(): |
||||
self._show_lines(lines) |
||||
|
||||
self.app.run_in_main_loop(target) |
@ -0,0 +1,79 @@ |
||||
from threading import Thread |
||||
|
||||
from ...utils import http_get |
||||
from .Controller import Controller |
||||
from .TransactionController import TransactionController |
||||
from ceo.tui.models import TransactionModel |
||||
from ceo.tui.views import ChangeLoginShellConfirmationView, TransactionView |
||||
|
||||
|
||||
class ChangeLoginShellController(Controller): |
||||
def __init__(self, model, app): |
||||
super().__init__(model, app) |
||||
self.right_col_idx = 0 |
||||
self.prev_searched_username = None |
||||
|
||||
def on_next_button_pressed(self, button): |
||||
try: |
||||
self.model.username = self.get_username_from_view() |
||||
self.model.login_shell = self.view.login_shell_edit.edit_text |
||||
if not self.model.login_shell: |
||||
self.view.popup('Login shell must not be empty') |
||||
raise Controller.InvalidInput() |
||||
except Controller.InvalidInput: |
||||
return |
||||
view = ChangeLoginShellConfirmationView(self.model, self, self.app) |
||||
self.switch_to_view(view) |
||||
|
||||
def on_confirmation_button_pressed(self, button): |
||||
body = {'login_shell': self.model.login_shell} |
||||
model = TransactionModel( |
||||
['replace_login_shell'], |
||||
'PATCH', f'/api/members/{self.model.username}', |
||||
json=body |
||||
) |
||||
controller = TransactionController(model, self.app) |
||||
view = TransactionView(model, controller, self.app) |
||||
controller.view = view |
||||
self.switch_to_view(view) |
||||
|
||||
# TODO: reduce code duplication with AddUserController |
||||
|
||||
def on_row_focus_changed(self): |
||||
_, idx = self.view.listwalker.get_focus() |
||||
old_idx = self.right_col_idx |
||||
self.right_col_idx = idx |
||||
# The username field is the first row, so when |
||||
# idx changes from 0 to 1, this means the user |
||||
# moved from the username field to the next field |
||||
if old_idx == 0 and idx == 1: |
||||
Thread( |
||||
target=self._lookup_user, |
||||
args=(self.view.username_edit.edit_text,) |
||||
).start() |
||||
|
||||
def _set_flash_text(self, *args): |
||||
self.view.flash_text.set_text('Looking up user...') |
||||
|
||||
def _clear_flash_text(self): |
||||
self.view.flash_text.set_text('') |
||||
|
||||
def _on_lookup_user_success(self): |
||||
self._clear_flash_text() |
||||
self.view.update_fields() |
||||
|
||||
def _lookup_user(self, username): |
||||
if not username: |
||||
return |
||||
if username == self.prev_searched_username: |
||||
return |
||||
self.prev_searched_username = username |
||||
self.app.run_in_main_loop(self._set_flash_text) |
||||
resp = http_get('/api/members/' + username) |
||||
if not resp.ok: |
||||
self.app.run_in_main_loop(self._clear_flash_text) |
||||
return |
||||
data = resp.json() |
||||
self.model.login_shell = data.get('login_shell', '') |
||||
|
||||
self.app.run_in_main_loop(self._on_lookup_user_success) |
@ -0,0 +1,78 @@ |
||||
from abc import ABC |
||||
|
||||
import ceo.tui.utils as utils |
||||
|
||||
|
||||
# NOTE: one controller can control multiple views, |
||||
# but each view must have exactly one controller |
||||
class Controller(ABC): |
||||
class InvalidInput(Exception): |
||||
pass |
||||
|
||||
class RequestFailed(Exception): |
||||
pass |
||||
|
||||
def __init__(self, model, app): |
||||
super().__init__() |
||||
self.model = model |
||||
self.app = app |
||||
# Since the view and the controller both have a reference to each |
||||
# other, this needs to be initialized in a separate step |
||||
self.view = None |
||||
|
||||
def _push_history(self, old_view, new_view): |
||||
if new_view.model.name == 'Welcome': |
||||
self.app.history.clear() |
||||
else: |
||||
self.app.history.append(old_view) |
||||
|
||||
def switch_to_view(self, new_view): |
||||
self._push_history(self.view, new_view) |
||||
self.view = new_view |
||||
new_view.activate() |
||||
|
||||
def go_to_next_menu(self, next_menu_name): |
||||
_, new_view, _ = utils.get_mvc(self.app, next_menu_name) |
||||
self._push_history(self.view, new_view) |
||||
new_view.activate() |
||||
|
||||
def prev_menu_callback(self, button): |
||||
prev_view = self.app.history.pop() |
||||
prev_view.controller.view = prev_view |
||||
prev_view.activate() |
||||
|
||||
def next_menu_callback(self, button, next_menu_name): |
||||
self.go_to_next_menu(next_menu_name) |
||||
|
||||
def get_next_menu_callback(self, next_menu_name): |
||||
def callback(button): |
||||
self.next_menu_callback(button, next_menu_name) |
||||
return callback |
||||
|
||||
def get_username_from_view(self): |
||||
username = self.view.username_edit.edit_text |
||||
# TODO: share validation logic between CLI and TUI |
||||
if not username: |
||||
self.view.popup('Username must not be empty') |
||||
raise Controller.InvalidInput() |
||||
return username |
||||
|
||||
def get_group_name_from_view(self): |
||||
name = self.view.name_edit.edit_text |
||||
# TODO: share validation logic between CLI and TUI |
||||
if not name: |
||||
self.view.popup('Name must not be empty') |
||||
raise Controller.InvalidInput() |
||||
return name |
||||
|
||||
def get_num_terms_from_view(self): |
||||
num_terms_str = self.view.num_terms_edit.edit_text |
||||
if num_terms_str: |
||||
num_terms = int(num_terms_str) |
||||
else: |
||||
num_terms = 0 |
||||
# TODO: share validation logic between CLI and TUI |
||||
if num_terms <= 0: |
||||
self.view.popup('Number of terms must be a positive integer') |
||||
raise Controller.InvalidInput() |
||||
return num_terms |
@ -0,0 +1,50 @@ |
||||
import os |
||||
|
||||
from zope import component |
||||
|
||||
from ...utils import http_get, http_post, write_db_creds |
||||
from .SyncRequestController import SyncRequestController |
||||
import ceo.krb_check as krb |
||||
from ceo.tui.views import CreateDatabaseConfirmationView, CreateDatabaseResponseView |
||||
from ceo_common.interfaces import IConfig |
||||
|
||||
|
||||
class CreateDatabaseController(SyncRequestController): |
||||
def __init__(self, model, app): |
||||
super().__init__(model, app) |
||||
|
||||
def on_db_type_changed(self, radio_button, new_state, selected_type): |
||||
if new_state: |
||||
self.model.db_type = selected_type |
||||
|
||||
def on_next_button_pressed(self, button): |
||||
view = CreateDatabaseConfirmationView(self.model, self, self.app) |
||||
self.switch_to_view(view) |
||||
|
||||
def get_resp(self): |
||||
db_type = self.model.db_type |
||||
username = krb.get_username() |
||||
resp = http_get(f'/api/members/{username}') |
||||
if not resp.ok: |
||||
return resp |
||||
self.model.user_dict = resp.json() |
||||
return http_post(f'/api/db/{db_type}/{username}') |
||||
|
||||
def get_response_view(self): |
||||
return CreateDatabaseResponseView(self.model, self, self.app) |
||||
|
||||
def write_db_creds_to_file(self): |
||||
password = self.model.resp_json['password'] |
||||
db_type = self.model.db_type |
||||
cfg = component.getUtility(IConfig) |
||||
db_host = cfg.get(f'{db_type}_host') |
||||
homedir = self.model.user_dict['home_directory'] |
||||
filename = os.path.join(homedir, f"ceo-{db_type}-info") |
||||
wrote_to_file = write_db_creds( |
||||
filename, self.model.user_dict, password, db_type, db_host |
||||
) |
||||
|
||||
self.model.password = password |
||||
self.model.db_host = db_host |
||||
self.model.filename = filename |
||||
self.model.wrote_to_file = wrote_to_file |
@ -0,0 +1,22 @@ |
||||
from ...utils import http_get |
||||
from .Controller import Controller |
||||
from .SyncRequestController import SyncRequestController |
||||
from ceo.tui.views import GetGroupResponseView |
||||
|
||||
|
||||
class GetGroupController(SyncRequestController): |
||||
def __init__(self, model, app): |
||||
super().__init__(model, app) |
||||
|
||||
def get_resp(self): |
||||
return http_get(f'/api/groups/{self.model.name}') |
||||
|
||||
def get_response_view(self): |
||||
return GetGroupResponseView(self.model, self, self.app) |
||||
|
||||
def on_next_button_pressed(self, button): |
||||
try: |
||||
self.model.name = self.get_group_name_from_view() |
||||
except Controller.InvalidInput: |
||||
return |
||||
self.on_confirmation_button_pressed(button) |
@ -0,0 +1,29 @@ |
||||
from threading import Thread |
||||
|
||||
from ...utils import http_get |
||||
from .Controller import Controller |
||||
import ceo.tui.utils as tui_utils |
||||
|
||||
|
||||
class GetPositionsController(Controller): |
||||
def __init__(self, model, app): |
||||
super().__init__(model, app) |
||||
|
||||
def lookup_positions_async(self): |
||||
self.view.flash_text.set_text('Looking up positions...') |
||||
Thread(target=self.lookup_positions_sync).start() |
||||
|
||||
def lookup_positions_sync(self): |
||||
resp = http_get('/api/positions') |
||||
try: |
||||
positions = tui_utils.handle_sync_response(resp, self) |
||||
except Controller.RequestFailed: |
||||
return |
||||
for pos, username in positions.items(): |
||||
self.model.positions[pos] = username |
||||
|
||||
def target(): |
||||
self.view.flash_text.set_text('') |
||||
self.view.update_fields() |
||||
|
||||
self.app.run_in_main_loop(target) |
@ -0,0 +1,22 @@ |
||||
from ...utils import http_get |
||||
from .Controller import Controller |
||||
from .SyncRequestController import SyncRequestController |
||||
from ceo.tui.views import GetUserResponseView |
||||
|
||||
|
||||
class GetUserController(SyncRequestController): |
||||
def __init__(self, model, app): |
||||
super().__init__(model, app) |
||||
|
||||
def get_resp(self): |
||||
return http_get(f'/api/members/{self.model.username}') |
||||
|
||||
def get_response_view(self): |
||||
return GetUserResponseView(self.model, self, self.app) |
||||
|
||||
def on_next_button_pressed(self, button): |
||||
try: |
||||
self.model.username = self.get_username_from_view() |
||||
except Controller.InvalidInput: |
||||
return |
||||
self.on_confirmation_button_pressed(button) |
@ -0,0 +1,37 @@ |
||||
from .Controller import Controller |
||||
from ceod.transactions.groups import RemoveMemberFromGroupTransaction |
||||
from .TransactionController import TransactionController |
||||
from ceo.tui.models import TransactionModel |
||||
from ceo.tui.views import RemoveMemberFromGroupConfirmationView, TransactionView |
||||
|
||||
|
||||
class RemoveMemberFromGroupController(Controller): |
||||
def __init__(self, model, app): |
||||
super().__init__(model, app) |
||||
|
||||
def on_list_unsubscribe_checkbox_change(self, checkbox, new_state): |
||||
self.model.unsubscribe_from_lists = new_state |
||||
|
||||
|