Rewrite TUI #52

Merged
merenber merged 7 commits from tui-urwid into master 2022-05-22 14:09:48 -04:00
116 changed files with 2333 additions and 1733 deletions

View File

@ -28,13 +28,16 @@ POSTGRES_DIR=/etc/postgresql/11/main
cat <<EOF > $POSTGRES_DIR/pg_hba.conf
# TYPE DATABASE USER ADDRESS METHOD
local all postgres peer
host all postgres localhost md5
host all postgres 0.0.0.0/0 md5
host all postgres ::/0 md5
local all all peer
host all all localhost md5
local sameuser all peer
host sameuser all 0.0.0.0/0 md5
host sameuser all ::/0 md5
EOF
grep -Eq "^listen_addresses = '*'$" $POSTGRES_DIR/postgresql.conf || \
echo "listen_addresses = '*'" >> $POSTGRES_DIR/postgresql.conf

View File

@ -15,13 +15,13 @@ def get_terms_for_new_user(num_terms: int) -> List[str]:
def get_terms_for_renewal(
username: str, num_terms: int, clubrep: bool, tui_model=None,
username: str, num_terms: int, clubrep: bool, tui_controller=None,
) -> List[str]:
resp = http_get('/api/members/' + username)
if tui_model is None:
if tui_controller is None:
result = cli_utils.handle_sync_response(resp)
else:
result = tui_utils.handle_sync_response(resp, tui_model)
result = tui_utils.handle_sync_response(resp, tui_controller)
max_term = None
current_term = Term.current()
if clubrep and 'non_member_terms' in result:

View File

@ -1,160 +0,0 @@
from asciimatics.event import KeyboardEvent
from asciimatics.exceptions import NextScene, StopApplication
from asciimatics.screen import Screen
from asciimatics.widgets import Frame, Layout, Divider, Button, Label, \
PopUpDialog
class CeoFrame(Frame):
def __init__(
self,
screen,
height,
width,
model,
name,
on_load=None,
title=None,
escape_on_q=False, # whether to quit when 'q' is pressed
):
super().__init__(
screen,
height,
width,
name=name,
can_scroll=False,
title=title,
on_load=self._ceoframe_on_load,
)
self._custom_on_load = on_load
self._model = model
self._name = name
# If a view has a custom on_load function, all layouts should
# be created on load (*not* in the constructor)
self._has_dynamic_layouts = on_load is not None
self._quit_keys = [Screen.KEY_ESCAPE]
if escape_on_q:
self._quit_keys.append(ord('q'))
# child classes may override this as a last resort
self.skip_reload = False
def _ceoframe_on_load(self):
if self.skip_reload:
self.skip_reload = False
return
if self._model.title is not None:
self.title = self._model.title
self._model.title = None
if self._has_dynamic_layouts and self._model.nav_direction == 'forward':
# We arrive here after a user pressed 'Back' then 'Next',
# or after we returned to the Welcome screen.
# The data may have changed, so we need to redraw everything,
# via self._custom_on_load().
self.clear_layouts()
self._custom_on_load()
# may be overridden by child classes
def _ceoframe_on_reset(self):
"""
This is called whenever we return to the home screen
after some kind of operation was completed.
This is called from Model.reset().
"""
pass
def clear_layouts(self):
self._layouts.clear()
def force_update(self):
"""
This should be called by background threads after they make changes
to the UI.
"""
# Since we're running in a separate thread, we need to force the
# screen to update. See
# https://github.com/peterbrittain/asciimatics/issues/56
self.fix()
self._model.screen.force_update()
def add_buttons(
self, back_btn=False, back_btn_text='Back',
next_scene=None, next_btn_text='Next', on_next=None,
on_next_excl=None,
):
"""
Add a new layout at the bottom of the frame with buttons.
If back_btn is True, a Back button is added.
If next_scene is set to the name of the next scene, or on_next_excl
is set, a Next button will be added.
If on_next is set to a function, it will be called when the Next
button is pressed, and the screen will switch to the next scene.
If on_next_excl is set to a function, it will be called when the Next
button is pressed, and the scene will not be switched.
If both on_next and on_next_excl are set, on_next will be ignored.
"""
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Divider())
def _back():
self._model.nav_direction = 'backward'
last_scene = self._model.scene_stack.pop()
if last_scene == 'Welcome':
self._model.reset()
raise NextScene(last_scene)
def _next():
self._model.nav_direction = 'forward'
if on_next_excl is not None:
on_next_excl()
return
if on_next is not None:
on_next()
self.go_to_next_scene(next_scene)
layout = Layout([1, 1])
self.add_layout(layout)
if back_btn:
layout.add_widget(Button(back_btn_text, _back), 0)
if next_scene is not None or on_next_excl is not None:
layout.add_widget(Button(next_btn_text, _next), 1)
def go_to_next_scene(self, next_scene: str):
self._model.scene_stack.append(self._name)
raise NextScene(next_scene)
def add_flash_message_layout(self):
layout = Layout([100])
self.add_layout(layout)
self._status_label = Label('')
layout.add_widget(self._status_label)
def flash_message(self, msg: str, force_update: bool = False):
self._status_label.text = msg
if force_update:
self._model.screen.force_update()
self._model.screen.draw_next_frame()
def clear_flash_message(self, force_update: bool = False):
self.flash_message('', force_update)
def process_event(self, event):
if not isinstance(event, KeyboardEvent):
return super().process_event(event)
c = event.key_code
# Stop on 'q' or 'Esc'
if c in self._quit_keys:
self._scene.add_effect(PopUpDialog(
self.screen,
'Are you sure you want to quit?',
['Yes', 'No'],
has_shadow=True,
on_close=self._quit_on_yes,
))
return super().process_event(event)
@staticmethod
def _quit_on_yes(selected):
# Yes is the first button
if selected == 0:
raise StopApplication("User terminated app")

View File

@ -1,61 +0,0 @@
from asciimatics.widgets import Layout, Label
from .CeoFrame import CeoFrame
class ConfirmView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Confirm',
on_load=self._confirmview_on_load, title='Confirmation',
escape_on_q=True,
)
def _add_line(self, text: str = ''):
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Label(text, align='^'))
def _add_pair(self, key: str, val: str):
layout = Layout([10, 1, 10])
self.add_layout(layout)
layout.add_widget(Label(key + ':', align='>'), 0)
layout.add_widget(Label(val, align='<'), 2)
def _confirmview_on_load(self):
for _ in range(2):
self._add_line()
for line in self._model.confirm_lines:
if isinstance(line, str):
self._add_line(line)
else:
# assume tuple
key, val = line
self._add_pair(key, val)
# fill the rest of the space
self.add_layout(Layout([100], fill_frame=True))
kwargs = {
'back_btn': True, 'back_btn_text': 'No', 'next_btn_text': 'Yes',
}
if self._model.operations is not None:
kwargs['next_scene'] = self._model.txn_view_name or 'Transaction'
else:
self.add_flash_message_layout()
kwargs['on_next_excl'] = self._next
self.add_buttons(**kwargs)
self.fix()
# OK so there's some weird bug somewhere which causes the buttons to be unselectable
# if we add a new user, return to the Welcome screen, then try to renew a user.
# This is a workaround for that.
self.skip_reload = True
self.reset()
def _next(self):
self.flash_message('Sending request...', force_update=True)
try:
self._model.resp = self._model.deferred_req()
finally:
self.clear_flash_message()
next_scene = self._model.result_view_name or 'Result'
self.go_to_next_scene(next_scene)

View File

@ -1,29 +0,0 @@
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Layout, Label
from .CeoFrame import CeoFrame
class ErrorView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Error',
on_load=self._errorview_on_load, title='Error',
)
def _errorview_on_load(self):
layout = Layout([1, 10], fill_frame=True)
self.add_layout(layout)
for _ in range(2):
layout.add_widget(Label(''), 1)
layout.add_widget(Label('An error occurred:'), 1)
layout.add_widget(Label(''), 1)
for line in self._model.error_message.splitlines():
layout.add_widget(Label(line), 1)
self.add_buttons(on_next_excl=self._next)
self.fix()
def _next(self):
self._model.reset()
raise NextScene('Welcome')

View File

@ -1,56 +0,0 @@
from copy import deepcopy
from zope import component
from ceo_common.interfaces import IConfig
class Model:
"""A convenient place to store View data persistently."""
def __init__(self):
cfg = component.getUtility(IConfig)
self.screen = None
self.views = []
self.title = None
self.scene_stack = []
self.result_view_name = None
self.txn_view_name = None
self.error_message = None
self.nav_direction = 'forward'
# View-specific data
self._initial_viewdata = {
'ResetPassword': {
'uid': '',
},
}
for pos in cfg.get('positions_available'):
self._initial_viewdata[pos] = ''
self.viewdata = deepcopy(self._initial_viewdata)
# data which is shared between multiple views
self.is_club_rep = False
self.confirm_lines = None
self.operations = None
self.deferred_req = None
self.resp = None
self.db_type = None
self.user_dict = None
def reset(self):
self.viewdata = deepcopy(self._initial_viewdata)
self.is_club_rep = False
self.confirm_lines = None
self.operations = None
self.deferred_req = None
self.resp = None
self.db_type = None
self.user_dict = None
self.title = None
self.error_message = None
self.scene_stack.clear()
self.result_view_name = None
self.txn_view_name = None
for view in self.views:
if hasattr(view, '_ceoframe_on_reset'):
view._ceoframe_on_reset()

View File

@ -1,62 +0,0 @@
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Layout, Label
import requests
from .CeoFrame import CeoFrame
class ResultView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Result',
on_load=self._resultview_on_load, title='Result',
escape_on_q=True,
)
# TODO: deduplicate this from ConfirmView
def _add_text(self, text: str = '\n', center: bool = False):
if center:
layout = Layout([100])
align = '^'
col = 0
else:
layout = Layout([1, 10])
align = '<'
col = 1
self.add_layout(layout)
for line in text.splitlines():
layout.add_widget(Label(line, align=align), col)
def _add_pair(self, key: str, val: str):
layout = Layout([10, 1, 10])
self.add_layout(layout)
if key:
layout.add_widget(Label(key + ':', align='>'), 0)
else:
layout.add_widget(Label(''), 0)
layout.add_widget(Label(val, align='<'), 2)
# override this method in child classes if desired
def show_result(self, resp: requests.Response):
self._add_text('The operation was successfully performed.', center=True)
def _resultview_on_load(self):
self._add_text()
resp = self._model.resp
if not resp.ok:
self._add_text('An error occurred:')
if resp.headers.get('content-type') == 'application/json':
err_msg = resp.json()['error']
else:
err_msg = resp.text.rstrip()
self._add_text(err_msg)
else:
self.show_result(resp)
# fill the rest of the space
self.add_layout(Layout([100], fill_frame=True))
self.add_buttons(on_next_excl=self._next)
self.fix()
def _next(self):
self._model.reset()
raise NextScene('Welcome')

View File

@ -1,93 +0,0 @@
from typing import Dict, Union
from asciimatics.widgets import Label, Layout
import requests
from .Model import Model
from ..StreamResponseHandler import StreamResponseHandler
class TUIStreamResponseHandler(StreamResponseHandler):
def __init__(
self,
model: Model,
labels: Dict[str, Label],
msg_layout: Layout,
txn_view, # TransactionView
):
super().__init__()
self.screen = model.screen
self.operations = model.operations
self.idx = 0
self.labels = labels
self.msg_layout = msg_layout
self.txn_view = txn_view
self.error_messages = []
def _update(self):
self.txn_view.force_update()
def _show_msg(self, msg: str = '\n'):
for line in msg.splitlines():
self.msg_layout.add_widget(Label(line, align='^'))
def _abort(self):
for operation in self.operations[self.idx:]:
self.labels[operation].text = 'ABORTED'
self.txn_view.enable_next_btn()
def handle_non_200(self, resp: requests.Response):
self._abort()
self._show_msg('An error occurred:')
if resp.headers.get('content-type') == 'application/json':
err_msg = resp.json()['error']
else:
err_msg = resp.text
self._show_msg(err_msg)
self._update()
def begin(self):
pass
def handle_aborted(self, err_msg: str):
self._abort()
self._show_msg('The transaction was rolled back.')
self._show_msg('The error was:\n')
self._show_msg(err_msg)
self._show_msg()
self._show_msg('Please check the ceod logs.')
self._update()
def handle_completed(self):
self._show_msg('Transaction successfully completed.')
if len(self.error_messages) > 0:
self._show_msg('There were some errors:')
for msg in self.error_messages:
self._show_msg(msg)
self.txn_view.enable_next_btn()
self._update()
def handle_successful_operation(self):
operation = self.operations[self.idx]
self.labels[operation].text = 'Done'
self.idx += 1
self._update()
def handle_failed_operation(self, err_msg: Union[str, None]):
operation = self.operations[self.idx]
self.labels[operation].text = 'Failed'
if err_msg is not None:
self.error_messages.append(err_msg)
self.idx += 1
self._update()
def handle_skipped_operation(self):
operation = self.operations[self.idx]
self.labels[operation].text = 'Skipped'
self.idx += 1
self._update()
def handle_unrecognized_operation(self, operation: str):
self.error_messages.append('Unrecognized operation: ' + operation)
self.idx += 1
self._update()

View File

@ -1,85 +0,0 @@
from threading import Thread
from typing import List, Dict
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Layout, Button, Divider, Label
from ..operation_strings import descriptions as op_desc
from ..utils import generic_handle_stream_response
from .CeoFrame import CeoFrame
from .TUIStreamResponseHandler import TUIStreamResponseHandler
class TransactionView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Transaction',
on_load=self._txnview_on_load, title='Running Transaction',
)
# map operation names to label widgets
self._labels = {}
def _add_buttons(self):
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Divider())
layout = Layout([1, 1])
self.add_layout(layout)
self._next_btn = Button('Next', self._next)
layout.add_widget(self._next_btn, 1)
def _add_blank_line(self):
self._op_layout.add_widget(Label(''), 0)
self._op_layout.add_widget(Label(''), 2)
def _txnview_on_load(self):
self._op_layout = Layout([12, 1, 10])
self.add_layout(self._op_layout)
# store the layouts so that we can re-use them when the screen
# gets resized
for _ in range(2):
self._add_blank_line()
for operation in self._model.operations:
desc = op_desc[operation]
self._op_layout.add_widget(Label(desc + '...', align='>'), 0)
desc_label = Label('', align='<')
self._op_layout.add_widget(desc_label, 2)
self._labels[operation] = desc_label
self._add_blank_line()
# this is the where success/failure messages etc. get placed
self._msg_layout = Layout([100])
self.add_layout(self._msg_layout)
# fill up the rest of the space
self.add_layout(Layout([100], fill_frame=True))
self._add_buttons()
self.fix()
Thread(target=self._do_txn).start()
def _do_txn(self):
resp = self._model.deferred_req()
handler = TUIStreamResponseHandler(
model=self._model,
labels=self._labels,
msg_layout=self._msg_layout,
txn_view=self,
)
data = generic_handle_stream_response(resp, self._model.operations, handler)
self.write_extra_txn_info(data)
# to be overridden in child classes if desired
def write_extra_txn_info(self, data: List[Dict]):
pass
def enable_next_btn(self):
self._next_btn.disabled = False
# If we don't reset, the button isn't selectable, even though we
# enabled it.
# We don't want to reload, though (which reset() will trigger).
self.skip_reload = True
self.reset()
def _next(self):
self._model.reset()
raise NextScene('Welcome')

View File

@ -1,107 +0,0 @@
from asciimatics.widgets import ListBox, Layout, Divider, Button, Label
from asciimatics.exceptions import NextScene, StopApplication
from .CeoFrame import CeoFrame
class WelcomeView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Welcome',
title='CSC Electronic Office',
escape_on_q=True,
)
members_menu_items = [
('Add member', 'AddUser'),
('Add club rep', 'AddUser'),
('Renew member', 'RenewUser'),
('Renew club rep', 'RenewUser'),
('Get user info', 'GetUser'),
('Reset password', 'ResetPassword'),
('Change login shell', 'ChangeLoginShell'),
('Set forwarding addresses', 'SetForwardingAddresses'),
]
groups_menu_items = [
('Add group', 'AddGroup'),
('Get group members', 'GetGroup'),
('Add member to group', 'AddMemberToGroup'),
('Remove member from group', 'RemoveMemberFromGroup'),
]
db_menu_items = [
('Create MySQL database', 'CreateDatabase'),
('Reset MySQL password', 'ResetDatabasePassword'),
('Create PostgreSQL database', 'CreateDatabase'),
('Reset PostgreSQL password', 'ResetDatabasePassword'),
]
positions_menu_items = [
('Get positions', 'GetPositions'),
('Set positions', 'SetPositions'),
]
self.menu_items = [
('members', members_menu_items),
('groups', groups_menu_items),
('databases', db_menu_items),
('positions', positions_menu_items),
]
self.menu_items_dict = dict(self.menu_items)
flat_menu_items = [item for name, items in self.menu_items for item in items]
menu = ListBox(
len(flat_menu_items),
[
(desc, i) for i, (desc, view) in
enumerate(flat_menu_items)
],
name='menu',
on_select=self._menu_select,
)
labels = []
for name, items in self.menu_items:
labels.append(Label(name.capitalize(), align='>'))
for _ in range(len(items) - 1):
labels.append(Label(''))
layout = Layout([5, 1, 8], fill_frame=True)
self.add_layout(layout)
layout.add_widget(menu, 2)
for label in labels:
layout.add_widget(label, 0)
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Label('Press <TAB> to switch widgets'))
layout.add_widget(Divider())
layout = Layout([1, 1, 1])
self.add_layout(layout)
layout.add_widget(Button("Quit", self._quit), 2)
self.fix()
def _menu_select(self):
self.save()
item_id = self.data['menu']
# find which submenu the item belongs to
counter = 0
for name, items in self.menu_items:
if item_id < counter + len(items):
break
counter += len(items)
submenu_idx = item_id - counter
desc, view = items[submenu_idx]
if name == 'members':
if desc.endswith('club rep'):
self._model.is_club_rep = True
elif name == 'databases':
if 'MySQL' in desc:
self._model.db_type = 'mysql'
else:
self._model.db_type = 'postgresql'
self._welcomeview_go_to_next_scene(desc, view)
def _welcomeview_go_to_next_scene(self, desc, view):
self._model.title = desc
self._model.scene_stack.append('Welcome')
raise NextScene(view)
@staticmethod
def _quit():
raise StopApplication("User pressed quit")

30
ceo/tui/app.py Normal file
View File

@ -0,0 +1,30 @@
import os
from queue import SimpleQueue
class App:
REL_WIDTH_PCT = 60
REL_HEIGHT_PCT = 70
# On a full-screen (1366x768) gnome-terminal window,
# I had 168 cols and 36 rows
WIDTH = int(0.6 * 168)
HEIGHT = int(0.7 * 36)
def __init__(self, loop, main_widget):
self.loop = loop
self.main_widget = main_widget
self.history = []
self.queued_pipe_callbacks = SimpleQueue()
self.pipefd = loop.watch_pipe(self._pipe_callback)
def run_in_main_loop(self, func):
self.queued_pipe_callbacks.put(func)
os.write(self.pipefd, b'\x00')
def _pipe_callback(self, data):
# We need to clear the whole queue because select()
# will only send one "notification" if there are two
# consecutive writes
while not self.queued_pipe_callbacks.empty():
self.queued_pipe_callbacks.get()()
return True

View File

@ -0,0 +1,36 @@
from .Controller import Controller
from .TransactionController import TransactionController
from ceo.tui.models import TransactionModel
from ceo.tui.views import AddGroupConfirmationView, TransactionView
from ceod.transactions.groups import AddGroupTransaction
class AddGroupController(Controller):
def __init__(self, model, app):
super().__init__(model, app)
def on_next_button_pressed(self, button):
try:
self.model.name = self.get_group_name_from_view()
self.model.description = self.view.description_edit.edit_text
if not self.model.description:
self.view.popup('Description must not be empty')
raise Controller.InvalidInput()
except Controller.InvalidInput:
return
view = AddGroupConfirmationView(self.model, self, self.app)
self.switch_to_view(view)
def on_confirmation_button_pressed(self, button):
body = {
'cn': self.model.name,
'description': self.model.description,
}
model = TransactionModel(
AddGroupTransaction.operations,
'POST', '/api/groups', json=body
)
controller = TransactionController(model, self.app)
view = TransactionView(model, controller, self.app)
controller.view = view
self.switch_to_view(view)

View File

@ -0,0 +1,37 @@
from .Controller import Controller
from ceod.transactions.groups import AddMemberToGroupTransaction
from .TransactionController import TransactionController
from ceo.tui.models import TransactionModel
from ceo.tui.views import AddMemberToGroupConfirmationView, TransactionView
class AddMemberToGroupController(Controller):
def __init__(self, model, app):
super().__init__(model, app)
def on_list_subscribe_checkbox_change(self, checkbox, new_state):
self.model.subscribe_to_lists = new_state
def on_next_button_pressed(self, button):
try:
self.model.name = self.get_group_name_from_view()
self.model.username = self.get_username_from_view()
except Controller.InvalidInput:
return
view = AddMemberToGroupConfirmationView(self.model, self, self.app)
self.switch_to_view(view)
def on_confirmation_button_pressed(self, button):
cn = self.model.name
uid = self.model.username
url = f'/api/groups/{cn}/members/{uid}'
if not self.model.subscribe_to_lists:
url += '?subscribe_to_lists=false'
model = TransactionModel(
AddMemberToGroupTransaction.operations,
'POST', url
)
controller = TransactionController(model, self.app)
view = TransactionView(model, controller, self.app)
controller.view = view
self.switch_to_view(view)

View File

@ -0,0 +1,110 @@
from threading import Thread
from ...utils import http_get
from .Controller import Controller
from .AddUserTransactionController import AddUserTransactionController
import ceo.term_utils as term_utils
from ceo.tui.models import TransactionModel
from ceo.tui.views import AddUserConfirmationView, TransactionView
from ceod.transactions.members import AddMemberTransaction
class AddUserController(Controller):
def __init__(self, model, app):
super().__init__(model, app)
self.right_col_idx = 0
self.prev_searched_username = None
def on_confirmation_button_pressed(self, button):
body = {
'uid': self.model.username,
'cn': self.model.full_name,
'given_name': self.model.first_name,
'sn': self.model.last_name,
}
if self.model.program:
body['program'] = self.model.program
if self.model.forwarding_address:
body['forwarding_addresses'] = [self.model.forwarding_address]
new_terms = term_utils.get_terms_for_new_user(self.model.num_terms)
if self.model.membership_type == 'club_rep':
body['non_member_terms'] = new_terms
else:
body['terms'] = new_terms
model = TransactionModel(
AddMemberTransaction.operations,
'POST', '/api/members',
json=body
)
controller = AddUserTransactionController(model, self.app)
view = TransactionView(model, controller, self.app)
controller.view = view
self.switch_to_view(view)
def on_next_button_pressed(self, button):
try:
username = self.get_username_from_view()
num_terms = self.get_num_terms_from_view()
except Controller.InvalidInput:
return
full_name = self.view.full_name_edit.edit_text
# TODO: share validation logic between CLI and TUI
if not full_name:
self.view.popup('Full name must not be empty')
return
self.model.username = username
self.model.full_name = full_name
self.model.first_name = self.view.first_name_edit.edit_text
self.model.last_name = self.view.last_name_edit.edit_text
self.model.program = self.view.program_edit.edit_text
self.model.forwarding_address = self.view.forwarding_address_edit.edit_text
self.model.num_terms = num_terms
view = AddUserConfirmationView(self.model, self, self.app)
self.switch_to_view(view)
def on_membership_type_changed(self, radio_button, new_state, selected_type):
if new_state:
self.model.membership_type = selected_type
def on_row_focus_changed(self):
_, idx = self.view.listwalker.get_focus()
old_idx = self.right_col_idx
self.right_col_idx = idx
# The username field is the third row, so when
# idx changes from 2 to 3, this means the user
# moved from the username field to the next field
if old_idx == 2 and idx == 3:
Thread(
target=self._lookup_user,
args=(self.view.username_edit.edit_text,)
).start()
def _set_flash_text(self, *args):
self.view.flash_text.set_text('Looking up user...')
def _clear_flash_text(self):
self.view.flash_text.set_text('')
def _on_lookup_user_success(self):
self._clear_flash_text()
self.view.update_fields()
def _lookup_user(self, username):
if not username:
return
if username == self.prev_searched_username:
return
self.prev_searched_username = username
self.app.run_in_main_loop(self._set_flash_text)
resp = http_get('/api/uwldap/' + username)
if not resp.ok:
self.app.run_in_main_loop(self._clear_flash_text)
return
data = resp.json()
self.model.full_name = data.get('cn', '')
self.model.first_name = data.get('given_name', '')
self.model.last_name = data.get('sn', '')
self.model.program = data.get('program', '')
self.model.forwarding_address = data.get('mail_local_addresses', [''])[0]
self.app.run_in_main_loop(self._on_lookup_user_success)

View File

@ -0,0 +1,38 @@
from typing import Dict, List
from ...utils import get_failed_operations
from .TransactionController import TransactionController
class AddUserTransactionController(TransactionController):
def __init__(self, model, app):
super().__init__(model, app)
def handle_completed(self):
# We don't want to write to the message_text yet, but
# we still need to enable the Next button.
self.app.run_in_main_loop(self.view.enable_next_button)
def write_extra_txn_info(self, data: List[Dict]):
if data[-1]['status'] != 'completed':
return
result = data[-1]['result']
failed_operations = get_failed_operations(data)
lines = []
if failed_operations:
lines.append('Transaction successfully completed with some errors.')
else:
lines.append('Transaction successfully completed.')
lines.append('')
lines.append('User password is: ' + result['password'])
if 'send_welcome_message' in failed_operations:
lines.extend([
'',
'Since the welcome message was not sent, '
'you need to email this password to the user.'
])
def target():
self._show_lines(lines)
self.app.run_in_main_loop(target)

View File

@ -0,0 +1,79 @@
from threading import Thread
from ...utils import http_get
from .Controller import Controller
from .TransactionController import TransactionController
from ceo.tui.models import TransactionModel
from ceo.tui.views import ChangeLoginShellConfirmationView, TransactionView
class ChangeLoginShellController(Controller):
def __init__(self, model, app):
super().__init__(model, app)
self.right_col_idx = 0
self.prev_searched_username = None
def on_next_button_pressed(self, button):
try:
self.model.username = self.get_username_from_view()
self.model.login_shell = self.view.login_shell_edit.edit_text
if not self.model.login_shell:
self.view.popup('Login shell must not be empty')
raise Controller.InvalidInput()
except Controller.InvalidInput:
return
view = ChangeLoginShellConfirmationView(self.model, self, self.app)
self.switch_to_view(view)
def on_confirmation_button_pressed(self, button):
body = {'login_shell': self.model.login_shell}
model = TransactionModel(
['replace_login_shell'],
'PATCH', f'/api/members/{self.model.username}',
json=body
)
controller = TransactionController(model, self.app)
view = TransactionView(model, controller, self.app)
controller.view = view
self.switch_to_view(view)
# TODO: reduce code duplication with AddUserController
def on_row_focus_changed(self):
_, idx = self.view.listwalker.get_focus()
old_idx = self.right_col_idx
self.right_col_idx = idx
# The username field is the first row, so when
# idx changes from 0 to 1, this means the user
# moved from the username field to the next field
if old_idx == 0 and idx == 1:
Thread(
target=self._lookup_user,
args=(self.view.username_edit.edit_text,)
).start()
def _set_flash_text(self, *args):
self.view.flash_text.set_text('Looking up user...')
def _clear_flash_text(self):
self.view.flash_text.set_text('')
def _on_lookup_user_success(self):
self._clear_flash_text()
self.view.update_fields()
def _lookup_user(self, username):
if not username:
return
if username == self.prev_searched_username:
return
self.prev_searched_username = username
self.app.run_in_main_loop(self._set_flash_text)
resp = http_get('/api/members/' + username)
if not resp.ok:
self.app.run_in_main_loop(self._clear_flash_text)
return
data = resp.json()
self.model.login_shell = data.get('login_shell', '')
self.app.run_in_main_loop(self._on_lookup_user_success)

View File

@ -0,0 +1,78 @@
from abc import ABC
import ceo.tui.utils as utils
# NOTE: one controller can control multiple views,
# but each view must have exactly one controller
class Controller(ABC):
class InvalidInput(Exception):
pass
class RequestFailed(Exception):
pass
def __init__(self, model, app):
super().__init__()
self.model = model
self.app = app
# Since the view and the controller both have a reference to each
# other, this needs to be initialized in a separate step
self.view = None
def _push_history(self, old_view, new_view):
if new_view.model.name == 'Welcome':
self.app.history.clear()
else:
self.app.history.append(old_view)
def switch_to_view(self, new_view):
self._push_history(self.view, new_view)
self.view = new_view
new_view.activate()
def go_to_next_menu(self, next_menu_name):
_, new_view, _ = utils.get_mvc(self.app, next_menu_name)
self._push_history(self.view, new_view)
new_view.activate()
def prev_menu_callback(self, button):
prev_view = self.app.history.pop()
prev_view.controller.view = prev_view
prev_view.activate()
def next_menu_callback(self, button, next_menu_name):
self.go_to_next_menu(next_menu_name)
def get_next_menu_callback(self, next_menu_name):
def callback(button):
self.next_menu_callback(button, next_menu_name)
return callback
def get_username_from_view(self):
username = self.view.username_edit.edit_text
# TODO: share validation logic between CLI and TUI
if not username:
self.view.popup('Username must not be empty')
raise Controller.InvalidInput()
return username
def get_group_name_from_view(self):
name = self.view.name_edit.edit_text
# TODO: share validation logic between CLI and TUI
if not name:
self.view.popup('Name must not be empty')
raise Controller.InvalidInput()
return name
def get_num_terms_from_view(self):
num_terms_str = self.view.num_terms_edit.edit_text
if num_terms_str:
num_terms = int(num_terms_str)
else:
num_terms = 0
# TODO: share validation logic between CLI and TUI
if num_terms <= 0:
self.view.popup('Number of terms must be a positive integer')
raise Controller.InvalidInput()
return num_terms

View File

@ -0,0 +1,50 @@
import os
from zope import component
from ...utils import http_get, http_post, write_db_creds
from .SyncRequestController import SyncRequestController
import ceo.krb_check as krb
from ceo.tui.views import CreateDatabaseConfirmationView, CreateDatabaseResponseView
from ceo_common.interfaces import IConfig
class CreateDatabaseController(SyncRequestController):
def __init__(self, model, app):
super().__init__(model, app)
def on_db_type_changed(self, radio_button, new_state, selected_type):
if new_state:
self.model.db_type = selected_type
def on_next_button_pressed(self, button):
view = CreateDatabaseConfirmationView(self.model, self, self.app)
self.switch_to_view(view)
def get_resp(self):
db_type = self.model.db_type
username = krb.get_username()
resp = http_get(f'/api/members/{username}')
if not resp.ok:
return resp
self.model.user_dict = resp.json()
return http_post(f'/api/db/{db_type}/{username}')
def get_response_view(self):
return CreateDatabaseResponseView(self.model, self, self.app)
def write_db_creds_to_file(self):
password = self.model.resp_json['password']
db_type = self.model.db_type
cfg = component.getUtility(IConfig)
db_host = cfg.get(f'{db_type}_host')
homedir = self.model.user_dict['home_directory']
filename = os.path.join(homedir, f"ceo-{db_type}-info")
wrote_to_file = write_db_creds(
filename, self.model.user_dict, password, db_type, db_host
)
self.model.password = password
self.model.db_host = db_host
self.model.filename = filename
self.model.wrote_to_file = wrote_to_file

View File

@ -0,0 +1,22 @@
from ...utils import http_get
from .Controller import Controller
from .SyncRequestController import SyncRequestController
from ceo.tui.views import GetGroupResponseView
class GetGroupController(SyncRequestController):
def __init__(self, model, app):
super().__init__(model, app)
def get_resp(self):
return http_get(f'/api/groups/{self.model.name}')
def get_response_view(self):
return GetGroupResponseView(self.model, self, self.app)
def on_next_button_pressed(self, button):
try:
self.model.name = self.get_group_name_from_view()
except Controller.InvalidInput:
return
self.on_confirmation_button_pressed(button)

View File

@ -0,0 +1,29 @@
from threading import Thread
from ...utils import http_get
from .Controller import Controller
import ceo.tui.utils as tui_utils
class GetPositionsController(Controller):
def __init__(self, model, app):
super().__init__(model, app)
def lookup_positions_async(self):
self.view.flash_text.set_text('Looking up positions...')
Thread(target=self.lookup_positions_sync).start()
def lookup_positions_sync(self):
resp = http_get('/api/positions')
try:
positions = tui_utils.handle_sync_response(resp, self)
except Controller.RequestFailed:
return
for pos, username in positions.items():
self.model.positions[pos] = username
def target():
self.view.flash_text.set_text('')
self.view.update_fields()
self.app.run_in_main_loop(target)

View File

@ -0,0 +1,22 @@
from ...utils import http_get
from .Controller import Controller
from .SyncRequestController import SyncRequestController
from ceo.tui.views import GetUserResponseView
class GetUserController(SyncRequestController):
def __init__(self, model, app):
super().__init__(model, app)
def get_resp(self):
return http_get(f'/api/members/{self.model.username}')
def get_response_view(self):
return GetUserResponseView(self.model, self, self.app)
def on_next_button_pressed(self, button):
try:
self.model.username = self.get_username_from_view()
except Controller.InvalidInput:
return
self.on_confirmation_button_pressed(button)

View File

@ -0,0 +1,37 @@
from .Controller import Controller
from ceod.transactions.groups import RemoveMemberFromGroupTransaction
from .TransactionController import TransactionController
from ceo.tui.models import TransactionModel
from ceo.tui.views import RemoveMemberFromGroupConfirmationView, TransactionView
class RemoveMemberFromGroupController(Controller):
def __init__(self, model, app):
super().__init__(model, app)
def on_list_unsubscribe_checkbox_change(self, checkbox, new_state):
self.model.unsubscribe_from_lists = new_state
def on_next_button_pressed(self, button):
try:
self.model.name = self.get_group_name_from_view()
self.model.username = self.get_username_from_view()
except Controller.InvalidInput:
return
view = RemoveMemberFromGroupConfirmationView(self.model, self, self.app)
self.switch_to_view(view)
def on_confirmation_button_pressed(self, button):
cn = self.model.name
uid = self.model.username
url = f'/api/groups/{cn}/members/{uid}'
if not self.model.unsubscribe_from_lists:
url += '?unsubscribe_from_lists=false'
model = TransactionModel(
RemoveMemberFromGroupTransaction.operations,
'DELETE', url
)
controller = TransactionController(model, self.app)
view = TransactionView(model, controller, self.app)
controller.view = view
self.switch_to_view(view)

View File

@ -0,0 +1,54 @@
from threading import Thread
from ...utils import http_post
from .Controller import Controller
from .SyncRequestController import SyncRequestController
import ceo.term_utils as term_utils
from ceo.tui.views import RenewUserConfirmationView
class RenewUserController(SyncRequestController):
def __init__(self, model, app):
super().__init__(model, app)
def on_membership_type_changed(self, radio_button, new_state, selected_type):
if new_state:
self.model.membership_type = selected_type
def on_next_button_pressed(self, button):
try:
username = self.get_username_from_view()
num_terms = self.get_num_terms_from_view()
except Controller.InvalidInput:
return
self.model.username = username
self.model.num_terms = num_terms
self.view.flash_text.set_text('Looking up user...')
Thread(target=self._get_next_terms).start()
def _get_next_terms(self):
try:
self.model.new_terms = term_utils.get_terms_for_renewal(
self.model.username,
self.model.num_terms,
self.model.membership_type == 'club_rep',
self
)
except Controller.RequestFailed:
return
def target():
self.view.flash_text.set_text('')
view = RenewUserConfirmationView(self.model, self, self.app)
self.switch_to_view(view)
self.app.run_in_main_loop(target)
def get_resp(self):
uid = self.model.username
body = {'uid': uid}
if self.model.membership_type == 'club_rep':
body['non_member_terms'] = self.model.new_terms
else:
body['terms'] = self.model.new_terms
return http_post(f'/api/members/{uid}/renew', json=body)

View File

@ -0,0 +1,49 @@
import os
from zope import component
from ...utils import http_get, http_post, write_db_creds
from .SyncRequestController import SyncRequestController
import ceo.krb_check as krb
from ceo.tui.views import ResetDatabasePasswordConfirmationView, ResetDatabasePasswordResponseView
from ceo_common.interfaces import IConfig
class ResetDatabasePasswordController(SyncRequestController):
def __init__(self, model, app):
super().__init__(model, app)
def on_db_type_changed(self, radio_button, new_state, selected_type):
if new_state:
self.model.db_type = selected_type
def on_next_button_pressed(self, button):
view = ResetDatabasePasswordConfirmationView(self.model, self, self.app)
self.switch_to_view(view)
def get_resp(self):
db_type = self.model.db_type
username = krb.get_username()
resp = http_get(f'/api/members/{username}')
if not resp.ok:
return resp
self.model.user_dict = resp.json()
return http_post(f'/api/db/{db_type}/{username}/pwreset')
def get_response_view(self):
return ResetDatabasePasswordResponseView(self.model, self, self.app)
def write_db_creds_to_file(self):
password = self.model.resp_json['password']
db_type = self.model.db_type
cfg = component.getUtility(IConfig)
db_host = cfg.get(f'{db_type}_host')
homedir = self.model.user_dict['home_directory']
filename = os.path.join(homedir, f"ceo-{db_type}-info")
wrote_to_file = write_db_creds(
filename, self.model.user_dict, password, db_type, db_host
)
self.model.password = password
self.model.filename = filename
self.model.wrote_to_file = wrote_to_file

View File

@ -0,0 +1,27 @@
from ...utils import http_post
from .Controller import Controller
from .SyncRequestController import SyncRequestController
import ceo.krb_check as krb
from ceo.tui.views import ResetPasswordUsePasswdView, ResetPasswordConfirmationView, ResetPasswordResponseView
class ResetPasswordController(SyncRequestController):
def __init__(self, model, app):
super().__init__(model, app)
def get_resp(self):
return http_post(f'/api/members/{self.model.username}/pwreset')
def get_response_view(self):
return ResetPasswordResponseView(self.model, self, self.app)
def on_next_button_pressed(self, button):
try:
self.model.username = self.get_username_from_view()
except Controller.InvalidInput:
return
if self.model.username == krb.get_username():
view = ResetPasswordUsePasswdView(self.model, self, self.app)
else:
view = ResetPasswordConfirmationView(self.model, self, self.app)
self.switch_to_view(view)

View File

@ -0,0 +1,47 @@
from threading import Thread
from ...utils import http_get
from .Controller import Controller
from .TransactionController import TransactionController
from ceo.tui.models import TransactionModel
import ceo.tui.utils as tui_utils
from ceo.tui.views import TransactionView
from ceod.transactions.members import UpdateMemberPositionsTransaction
class SetPositionsController(Controller):
def __init__(self, model, app):
super().__init__(model, app)
def on_next_button_pressed(self, button):
body = {}
for pos, field in self.view.position_fields.items():
if field.edit_text != '':
body[pos] = field.edit_text
model = TransactionModel(
UpdateMemberPositionsTransaction.operations,
'POST', '/api/positions', json=body
)
controller = TransactionController(model, self.app)
view = TransactionView(model, controller, self.app)
controller.view = view
self.switch_to_view(view)
def lookup_positions_async(self):
self.view.flash_text.set_text('Looking up positions...')
Thread(target=self.lookup_positions_sync).start()
def lookup_positions_sync(self):
resp = http_get('/api/positions')
try:
positions = tui_utils.handle_sync_response(resp, self)
except Controller.RequestFailed:
return
for pos, username in positions.items():
self.model.positions[pos] = username
def target():
self.view.flash_text.set_text('')
self.view.update_fields()
self.app.run_in_main_loop(target)

View File

@ -0,0 +1,39 @@
from threading import Thread
from .Controller import Controller
import ceo.tui.utils as tui_utils
from ceo.tui.views import SyncResponseView
class SyncRequestController(Controller):
def __init__(self, model, app):
super().__init__(model, app)
self.request_in_progress = False
def get_resp(self):
# To be implemented by child classes
raise NotImplementedError()
def get_response_view(self):
return SyncResponseView(self.model, self, self.app)
def on_confirmation_button_pressed(self, button):
if self.request_in_progress:
return
self.request_in_progress = True
self.view.flash_text.set_text('Sending request...')
def main_loop_target():
self.view.flash_text.set_text('')
view = self.get_response_view()
self.switch_to_view(view)
def thread_target():
resp = self.get_resp()
try:
self.model.resp_json = tui_utils.handle_sync_response(resp, self)
except Controller.RequestFailed:
return
self.app.run_in_main_loop(main_loop_target)
Thread(target=thread_target).start()

View File

@ -0,0 +1,110 @@
from threading import Thread
from typing import Dict, List
from ...StreamResponseHandler import StreamResponseHandler
from ...utils import http_request, generic_handle_stream_response
from .Controller import Controller
class TransactionController(Controller, StreamResponseHandler):
def __init__(self, model, app):
super().__init__(model, app)
self.op_idx = 0
self.error_messages = []
def start(self):
Thread(target=self._start_txn).start()
def _start_txn(self):
resp = http_request(
self.model.http_verb,
self.model.req_path,
**self.model.req_kwargs
)
data = generic_handle_stream_response(resp, self.model.operations, self)
self.write_extra_txn_info(data)
# to be overridden in child classes if desired
def write_extra_txn_info(self, data: List[Dict]):
pass
def _show_lines(self, lines):
num_lines = len(lines)
# Since the message_text is at the bottom of the window,
# we want to add sufficient padding to the bottom of the text
lines += [''] * max(4 - num_lines, 0)
for i, line in enumerate(lines):
if type(line) is str:
lines[i] = line + '\n'
else: # tuple (attr, text)
lines[i] = (line[0], line[1] + '\n')
self.view.message_text.set_text(lines)
def _abort(self):
for elem in self.view.right_col_elems[self.op_idx:]:
elem.set_text(('red', 'ABORTED'))
self.view.enable_next_button()
def begin(self):
pass
def handle_non_200(self, resp):
def target():
self._abort()
lines = ['An error occurred:']
if resp.headers.get('content-type') == 'application/json':
err_msg = resp.json()['error']
else:
err_msg = resp.text
lines.extend(err_msg.split('\n'))
self._show_lines(lines)
self.app.run_in_main_loop(target)
def handle_aborted(self, err_msg):
def target():
self._abort()
lines = [
'The transaction was rolled back.',
'The error was:',
'',
*err_msg.split('\n'),
]
self._show_lines(lines)
self.app.run_in_main_loop(target)
def handle_completed(self):
def target():
lines = ['Transaction successfully completed.']
if len(self.error_messages) > 0:
lines.append('There were some errors:')
for msg in self.error_messages:
lines.extend(msg.split('\n'))
self._show_lines(lines)
self.view.enable_next_button()
self.app.run_in_main_loop(target)
def handle_successful_operation(self):
def target():
self.view.right_col_elems[self.op_idx].set_text(('green', 'Done'))
self.op_idx += 1
self.app.run_in_main_loop(target)
def handle_failed_operation(self, err_msg):
def target():
self.view.right_col_elems[self.op_idx].set_text(('red', 'Failed'))
self.op_idx += 1
if err_msg is not None:
self.error_messages.append(err_msg)
self.app.run_in_main_loop(target)
def handle_skipped_operation(self):
def target():
self.view.right_col_elems[self.op_idx].set_text('Skipped')
self.op_idx += 1
self.app.run_in_main_loop(target)
def handle_unrecognized_operation(self, operation):
def target():
self.error_messages.append('Unrecognized operation: ' + operation)
self.op_idx += 1
self.app.run_in_main_loop(target)

View File

@ -0,0 +1,6 @@
from .Controller import Controller
class WelcomeController(Controller):
def __init__(self, model, app):
super().__init__(model, app)

View File

@ -0,0 +1,18 @@
from .Controller import Controller
from .WelcomeController import WelcomeController
from .AddUserController import AddUserController
from .AddUserTransactionController import AddUserTransactionController
from .RenewUserController import RenewUserController
from .GetUserController import GetUserController
from .ResetPasswordController import ResetPasswordController
from .ChangeLoginShellController import ChangeLoginShellController
from .AddGroupController import AddGroupController
from .GetGroupController import GetGroupController
from .AddMemberToGroupController import AddMemberToGroupController
from .RemoveMemberFromGroupController import RemoveMemberFromGroupController
from .CreateDatabaseController import CreateDatabaseController
from .ResetDatabasePasswordController import ResetDatabasePasswordController
from .GetPositionsController import GetPositionsController
from .SetPositionsController import SetPositionsController
from .TransactionController import TransactionController
from .SyncRequestController import SyncRequestController

View File

@ -1,34 +0,0 @@
import os
import requests
from zope import component
from ...utils import write_db_creds
from ..ResultView import ResultView
from ceo_common.interfaces import IConfig
class CreateDatabaseResultView(ResultView):
def show_result(self, resp: requests.Response):
password = resp.json()['password']
db_type = self._model.db_type
db_type_name = 'MySQL' if db_type == 'mysql' else 'PostgreSQL'
db_host = component.getUtility(IConfig).get(f'{db_type}_host')
user_dict = self._model.user_dict
username = user_dict['uid']
filename = os.path.join(user_dict['home_directory'], f"ceo-{db_type}-info")
wrote_to_file = write_db_creds(
filename, user_dict, password, db_type, db_host)
self._add_text(f'{db_type_name} database created.', center=True)
self._add_text()
self._add_text((f'''Connection Information:
Database: {username}
Username: {username}
Password: {password}
Host: {db_host}'''))
self._add_text()
if wrote_to_file:
self._add_text(f"These settings have been written to {filename}.")
else:
self._add_text(f"We were unable to write these settings to {filename}.")

View File

@ -1,47 +0,0 @@
from asciimatics.widgets import Layout, Text
from ...utils import http_post, http_get, defer
from ..CeoFrame import CeoFrame
class CreateDatabaseView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'CreateDatabase',
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text("Username:", "uid")
layout.add_widget(self._username)
self.add_buttons(
back_btn=True, next_scene='Confirm',
on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._username.value = None
def _target(self):
username = self._username.value
db_type = self._model.db_type
resp = http_get(f'/api/members/{username}')
if not resp.ok:
return resp
user_dict = resp.json()
self._model.user_dict = user_dict
return http_post(f'/api/db/{db_type}/{username}')
def _next(self):
username = self._username.value
if not username:
return
if self._model.db_type == 'mysql':
db_type_name = 'MySQL'
else:
db_type_name = 'PostgreSQL'
self._model.confirm_lines = [
f'Are you sure you want to create a {db_type_name} database for {username}?',
]
self._model.deferred_req = defer(self._target)
self._model.result_view_name = 'CreateDatabaseResult'

View File

@ -1,29 +0,0 @@
import os
import requests
from zope import component
from ...utils import write_db_creds
from ..ResultView import ResultView
from ceo_common.interfaces import IConfig
class ResetDatabasePasswordResultView(ResultView):
def show_result(self, resp: requests.Response):
password = resp.json()['password']
db_type = self._model.db_type
db_type_name = 'MySQL' if db_type == 'mysql' else 'PostgreSQL'
db_host = component.getUtility(IConfig).get(f'{db_type}_host')
user_dict = self._model.user_dict
username = user_dict['uid']
filename = os.path.join(user_dict['home_directory'], f"ceo-{db_type}-info")
wrote_to_file = write_db_creds(
filename, user_dict, password, db_type, db_host)
self._add_text(f'The new {db_type_name} password for {username} is:')
self._add_text()
self._add_text(password)
self._add_text()
if wrote_to_file:
self._add_text(f"The settings in {filename} have been updated.")
else:
self._add_text(f"We were unable to update the settings in {filename}.")

View File

@ -1,47 +0,0 @@
from asciimatics.widgets import Layout, Text
from ...utils import http_post, http_get, defer
from ..CeoFrame import CeoFrame
class ResetDatabasePasswordView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'ResetDatabasePassword',
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text("Username:", "uid")
layout.add_widget(self._username)
self.add_buttons(
back_btn=True, next_scene='Confirm',
on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._username.value = None
def _target(self):
username = self._username.value
db_type = self._model.db_type
resp = http_get(f'/api/members/{username}')
if not resp.ok:
return resp
user_dict = resp.json()
self._model.user_dict = user_dict
return http_post(f'/api/db/{db_type}/{username}/pwreset')
def _next(self):
username = self._username.value
if not username:
return
if self._model.db_type == 'mysql':
db_type_name = 'MySQL'
else:
db_type_name = 'PostgreSQL'
self._model.confirm_lines = [
f'Are you sure you want to reset the {db_type_name} password for {username}?',
]
self._model.deferred_req = defer(self._target)
self._model.result_view_name = 'ResetDatabasePasswordResult'

View File

@ -1 +0,0 @@

View File

@ -1,46 +0,0 @@
from asciimatics.widgets import Layout, Text
from ...utils import defer, http_post
from ..CeoFrame import CeoFrame
from ceod.transactions.groups import AddGroupTransaction
class AddGroupView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'AddGroup',
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._cn = Text('Name:', 'cn')
layout.add_widget(self._cn)
self._description = Text('Description:', 'description')
layout.add_widget(self._description)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._cn.value = None
self._description.value = None
def _next(self):
cn = self._cn.value
description = self._description.value
body = {
'cn': cn,
'description': description,
}
self._model.confirm_lines = [
'The following group will be created:',
'',
('cn', cn),
('description', description),
'',
'Are you sure you want to continue?',
]
self._model.deferred_req = defer(http_post, '/api/groups', json=body)
self._model.operations = AddGroupTransaction.operations

View File

@ -1,49 +0,0 @@
from asciimatics.widgets import Layout, Text, CheckBox, Label
from ...utils import defer, http_post
from ..CeoFrame import CeoFrame
from ceod.transactions.groups import AddMemberToGroupTransaction
class AddMemberToGroupView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'AddMemberToGroup',
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._cn = Text('Group name:', 'cn')
layout.add_widget(self._cn)
self._username = Text('Username:', 'uid')
layout.add_widget(self._username)
layout.add_widget(Label(''))
self._checkbox = CheckBox(
'subscribe to auxiliary mailing lists', name='subscribe')
self._checkbox.value = True
layout.add_widget(self._checkbox)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._cn.value = None
self._username.value = None
self._checkbox.value = True
def _next(self):
cn = self._cn.value
uid = self._username.value
self._model.confirm_lines = [
f'Are you sure you want to add {uid} to {cn}?',
]
operations = AddMemberToGroupTransaction.operations
url = f'/api/groups/{cn}/members/{uid}'
# TODO: deduplicate this logic from the CLI
if not self._checkbox.value:
url += '?subscribe_to_lists=false'
operations.remove('subscribe_user_to_auxiliary_mailing_lists')
self._model.deferred_req = defer(http_post, url)
self._model.operations = operations

View File

@ -1,18 +0,0 @@
import requests
from ..ResultView import ResultView
class GetGroupResultView(ResultView):
def show_result(self, resp: requests.Response):
d = resp.json()
if 'description' in d:
desc = d['description'] + ' (' + d['cn'] + ')'
else:
desc = d['cn']
self._add_text('Members of ' + desc, center=True)
self._add_text()
for member in d['members']:
self._add_text(
member['cn'] + ' (' + member['uid'] + ')',
center=True)

View File

@ -1,33 +0,0 @@
from asciimatics.widgets import Layout, Text
from ...utils import http_get
from ..CeoFrame import CeoFrame
class GetGroupView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'GetGroup',
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._cn = Text("Group name", "cn")
layout.add_widget(self._cn)
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='GetGroupResult', on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._cn.value = None
def _next(self):
cn = self._cn.value
self.flash_message('Looking up group...', force_update=True)
try:
self._model.resp = http_get(f'/api/groups/{cn}')
finally:
self.clear_flash_message()

View File

@ -1,48 +0,0 @@
from asciimatics.widgets import Layout, Text, CheckBox, Label
from ...utils import defer, http_delete
from ..CeoFrame import CeoFrame
from ceod.transactions.groups import RemoveMemberFromGroupTransaction
class RemoveMemberFromGroupView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'RemoveMemberFromGroup',
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._cn = Text('Group name:', 'cn')
layout.add_widget(self._cn)
self._username = Text('Username:', 'uid')
layout.add_widget(self._username)
layout.add_widget(Label(''))
self._checkbox = CheckBox(
'unsubscribe from auxiliary mailing lists', name='unsubscribe')
self._checkbox.value = True
layout.add_widget(self._checkbox)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._cn.value = None
self._username.value = None
def _next(self):
cn = self._cn.value
uid = self._username.value
self._model.confirm_lines = [
f'Are you sure you want to remove {uid} from {cn}?',
]
operations = RemoveMemberFromGroupTransaction.operations
url = f'/api/groups/{cn}/members/{uid}'
# TODO: deduplicate this logic from the CLI
if not self._checkbox.value:
url += '?unsubscribe_from_lists=false'
operations.remove('unsubscribe_user_from_auxiliary_mailing_lists')
self._model.deferred_req = defer(http_delete, url)
self._model.operations = operations

View File

@ -1,25 +0,0 @@
from typing import List, Dict
from asciimatics.widgets import Label
from ...utils import get_failed_operations
from ..TransactionView import TransactionView
class AddUserTransactionView(TransactionView):
def _show_msg(self, msg: str = '\n'):
for line in msg.splitlines():
self._msg_layout.add_widget(Label(line, align='^'))
def write_extra_txn_info(self, data: List[Dict]):
if data[-1]['status'] != 'completed':
return
result = data[-1]['result']
failed_operations = get_failed_operations(data)
self._show_msg()
self._show_msg('User password is: ' + result['password'])
if 'send_welcome_message' in failed_operations:
self._show_msg()
self._show_msg('Since the welcome message was not sent,')
self._show_msg('you need to email this password to the user.')
self.force_update()

View File

@ -1,116 +0,0 @@
from threading import Thread
from asciimatics.widgets import Layout, Text
from ...term_utils import get_terms_for_new_user
from ...utils import http_get, http_post, defer, user_dict_kv, \
get_adduser_operations
from ..CeoFrame import CeoFrame
class AddUserView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'AddUser',
)
self._username_changed = False
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text(
"Username:", "uid",
on_change=self._on_username_change,
on_blur=self._on_username_blur,
)
layout.add_widget(self._username)
self._full_name = Text("Full name:", "cn")
layout.add_widget(self._full_name)
self._first_name = Text("First name:", "given_name")
layout.add_widget(self._first_name)
self._last_name = Text("Last name:", "sn")
layout.add_widget(self._last_name)
self._program = Text("Program:", "program")
layout.add_widget(self._program)
self._forwarding_address = Text("Forwarding address:", "forwarding_address")
layout.add_widget(self._forwarding_address)
self._num_terms = Text(
"Number of terms:", "num_terms",
validator=lambda s: s.isdigit() and s[0] != '0')
self._num_terms.value = '1'
layout.add_widget(self._num_terms)
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._username.value = None
self._full_name.value = None
self._program.value = None
self._forwarding_address.value = None
self._num_terms.value = '1'
def _on_username_change(self):
self._username_changed = True
def _on_username_blur(self):
if not self._username_changed:
return
self._username_changed = False
username = self._username.value
if username == '':
return
Thread(target=self._get_uwldap_info, args=[username]).start()
def _get_uwldap_info(self, username):
self.flash_message('Looking up user...')
try:
resp = http_get('/api/uwldap/' + username)
if resp.status_code != 200:
return
data = resp.json()
self._status_label.text = ''
if data.get('cn'):
self._full_name.value = data['cn']
if data.get('given_name'):
self._first_name.value = data['given_name']
if data.get('sn'):
self._last_name.value = data['sn']
if data.get('program'):
self._program.value = data.get('program', '')
if data.get('mail_local_addresses'):
self._forwarding_address.value = data['mail_local_addresses'][0]
finally:
self.clear_flash_message()
def _next(self):
body = {
'uid': self._username.value,
'cn': self._full_name.value,
'given_name': self._first_name.value,
'sn': self._last_name.value,
}
if self._program.value:
body['program'] = self._program.value
if self._forwarding_address.value:
body['forwarding_addresses'] = [self._forwarding_address.value]
new_terms = get_terms_for_new_user(int(self._num_terms.value))
if self._model.is_club_rep:
body['non_member_terms'] = new_terms
else:
body['terms'] = new_terms
pairs = user_dict_kv(body)
self._model.confirm_lines = [
'The following user will be created:',
'',
] + pairs + [
'',
'Are you sure you want to continue?',
]
self._model.deferred_req = defer(http_post, '/api/members', json=body)
self._model.operations = get_adduser_operations(body)
self._model.txn_view_name = 'AddUserTransaction'

View File

@ -1,75 +0,0 @@
from threading import Thread
from asciimatics.widgets import Layout, Text
from ...utils import defer, http_patch, http_get
from ..CeoFrame import CeoFrame
class ChangeLoginShellView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'ChangeLoginShell',
)
self._username_changed = False
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text(
"Username:", "uid",
on_change=self._on_username_change,
on_blur=self._on_username_blur,
)
layout.add_widget(self._username)
self._login_shell = Text('Login shell:', 'login_shell')
layout.add_widget(self._login_shell)
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._username.value = None
self._login_shell.value = None
# TODO: deduplicate this from AddUserView
def _on_username_change(self):
self._username_changed = True
def _on_username_blur(self):
if not self._username_changed:
return
self._username_changed = False
username = self._username.value
if username == '':
return
Thread(target=self._get_user_info, args=[username]).start()
def _get_user_info(self, username):
self.flash_message('Looking up user...')
try:
resp = http_get('/api/members/' + username)
if resp.status_code != 200:
return
data = resp.json()
self._login_shell.value = data['login_shell']
finally:
self.clear_flash_message()
def _next(self):
uid = self._username.value
login_shell = self._login_shell.value
body = {'login_shell': login_shell}
self._model.deferred_req = defer(
http_patch, f'/api/members/{uid}', json=body)
self._model.confirm_lines = [
f"{uid}'s login shell will be changed to:",
'',
login_shell,
'',
'Are you sure you want to continue?',
]
self._model.operations = ['replace_login_shell']

View File

@ -1,11 +0,0 @@
import requests
from ...utils import user_dict_kv
from ..ResultView import ResultView
class GetUserResultView(ResultView):
def show_result(self, resp: requests.Response):
pairs = user_dict_kv(resp.json())
for key, val in pairs:
self._add_pair(key, val)

View File

@ -1,33 +0,0 @@
from asciimatics.widgets import Layout, Text
from ...utils import http_get
from ..CeoFrame import CeoFrame
class GetUserView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'GetUser',
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text("Username:", "uid")
layout.add_widget(self._username)
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='GetUserResult', on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._username.value = None
def _next(self):
uid = self._username.value
self.flash_message('Looking up user...', force_update=True)
try:
self._model.resp = http_get(f'/api/members/{uid}')
finally:
self.clear_flash_message()

View File

@ -1,65 +0,0 @@
from asciimatics.widgets import Layout, Text
from ...term_utils import get_terms_for_renewal
from ...utils import http_post, defer
from ..CeoFrame import CeoFrame
class RenewUserView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'RenewUser',
)
self._model = model
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text("Username:", "uid")
layout.add_widget(self._username)
self._num_terms = Text(
"Number of terms:", "num_terms",
validator=lambda s: s.isdigit() and s[0] != '0')
self._num_terms.value = '1'
layout.add_widget(self._num_terms)
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._username.value = None
self._num_terms.value = '1'
def _next(self):
uid = self._username.value
self.flash_message('Looking up user...', force_update=True)
try:
new_terms = get_terms_for_renewal(
uid,
int(self._num_terms.value),
self._model.is_club_rep,
self._model,
)
finally:
self.clear_flash_message()
body = {'uid': uid}
if self._model.is_club_rep:
body['non_member_terms'] = new_terms
terms_str = 'non-member terms'
else:
body['terms'] = new_terms
terms_str = 'member terms'
self._model.confirm_lines = [
'The following ' + terms_str + ' will be added:',
'',
','.join(new_terms),
'',
'Are you sure you want to continue?',
]
self._model.deferred_req = defer(
http_post, f'/api/members/{uid}/renew', json=body)

View File

@ -1,12 +0,0 @@
from ..ResultView import ResultView
import requests
class ResetPasswordResultView(ResultView):
def show_result(self, resp: requests.Response):
result = resp.json()
uid = self._model.viewdata['ResetPassword']['uid']
self._add_text(f'The new password for {uid} is:', center=True)
self._add_text()
self._add_text(result['password'], center=True)

View File

@ -1,34 +0,0 @@
from asciimatics.widgets import Layout, Text, Label
from ...utils import defer, http_post
from ..CeoFrame import CeoFrame
class ResetPasswordView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'ResetPassword',
)
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
layout.add_widget(Label('Enter the username of the user whose password will be reset:'))
self._username = Text(None, "uid")
layout.add_widget(self._username)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._username.value = None
def _next(self):
uid = self._username.value
self._model.viewdata['ResetPassword']['uid'] = uid
self._model.confirm_lines = [
f"Are you sure you want to reset {uid}'s password?",
]
self._model.deferred_req = defer(http_post, f'/api/members/{uid}/pwreset')
self._model.result_view_name = 'ResetPasswordResult'

View File

@ -1,80 +0,0 @@
from threading import Thread
from asciimatics.widgets import Layout, Label, Text, TextBox, Widget
from ...utils import defer, http_patch, http_get
from ..CeoFrame import CeoFrame
class SetForwardingAddressesView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'SetForwardingAddresses',
)
self._username_changed = False
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text(
"Username:", "uid",
on_change=self._on_username_change,
on_blur=self._on_username_blur,
)
layout.add_widget(self._username)
self._forwarding_addresses = TextBox(
Widget.FILL_FRAME, 'Forwarding addresses:', 'forwarding_addresses',
line_wrap=True)
layout.add_widget(self._forwarding_addresses)
layout.add_widget(Label('Press <TAB> to switch widgets'))
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self._username.value = None
self._forwarding_addresses.value = None
# TODO: deduplicate this from AddUserView
def _on_username_change(self):
self._username_changed = True
def _on_username_blur(self):
if not self._username_changed:
return
self._username_changed = False
username = self._username.value
if username == '':
return
Thread(target=self._get_user_info, args=[username]).start()
def _get_user_info(self, username):
self.flash_message('Looking up user...')
try:
resp = http_get('/api/members/' + username)
if resp.status_code != 200:
return
data = resp.json()
if 'forwarding_addresses' not in data:
return
self._forwarding_addresses.value = data['forwarding_addresses']
finally:
self.clear_flash_message()
def _next(self):
uid = self._username.value
forwarding_addresses = self._forwarding_addresses.value
body = {'forwarding_addresses': forwarding_addresses}
self._model.deferred_req = defer(
http_patch, f'/api/members/{uid}', json=body)
self._model.confirm_lines = [
f"{uid}'s forwarding addresses will be set to:",
'',
*forwarding_addresses,
'',
'Are you sure you want to continue?',
]
self._model.operations = ['replace_forwarding_addresses']

View File

@ -0,0 +1,7 @@
class AddGroupModel:
name = 'AddGroup'
title = 'Add group'
def __init__(self):
self.name = ''
self.description = ''

View File

@ -0,0 +1,8 @@
class AddMemberToGroupModel:
name = 'AddMemberToGroup'
title = 'Add member to group'
def __init__(self):
self.name = ''
self.username = ''
self.subscribe_to_lists = True

View File

@ -0,0 +1,13 @@
class AddUserModel:
name = 'AddUser'
title = 'Add user'
def __init__(self):
self.membership_type = 'general_member'
self.username = ''
self.full_name = ''
self.first_name = ''
self.last_name = ''
self.program = ''
self.forwarding_address = ''
self.num_terms = 1

View File

@ -0,0 +1,8 @@
class ChangeLoginShellModel:
name = 'ChangeLoginShell'
title = 'Change login shell'
def __init__(self):
self.username = ''
self.login_shell = ''
self.resp_json = None

View File

@ -0,0 +1,12 @@
class CreateDatabaseModel:
name = 'CreateDatabase'
title = 'Create database'
def __init__(self):
self.db_type = 'mysql'
self.user_dict = None
self.resp_json = None
self.password = None
self.db_host = None
self.filename = None
self.wrote_to_file = False

View File

@ -0,0 +1,7 @@
class GetGroupModel:
name = 'GetGroup'
title = 'Get group members'
def __init__(self):
self.name = ''
self.resp_json = None

View File

@ -0,0 +1,4 @@
class GetPositionsModel:
name = 'GetPositions'
title = 'Get positions'
positions = {}

View File

@ -0,0 +1,7 @@
class GetUserModel:
name = 'GetUser'
title = 'Get user info'
def __init__(self):
self.username = ''
self.resp_json = None

View File

@ -0,0 +1,8 @@
class RemoveMemberFromGroupModel:
name = 'RemoveMemberFromGroup'
title = 'Remove member from group'
def __init__(self):
self.name = ''
self.username = ''
self.unsubscribe_from_lists = True

View File

@ -0,0 +1,10 @@
class RenewUserModel:
name = 'RenewUser'
title = 'Renew user'
def __init__(self):
self.membership_type = 'general_member'
self.username = ''
self.num_terms = 1
self.new_terms = None
self.resp_json = None

View File

@ -0,0 +1,11 @@
class ResetDatabasePasswordModel:
name = 'ResetDatabasePassword'
title = 'Reset database password'
def __init__(self):
self.db_type = 'mysql'
self.user_dict = None
self.resp_json = None
self.password = None
self.filename = None
self.wrote_to_file = False

View File

@ -0,0 +1,7 @@
class ResetPasswordModel:
name = 'ResetPassword'
title = 'Reset password'
def __init__(self):
self.username = ''
self.resp_json = None

View File

@ -0,0 +1,4 @@
class SetPositionsModel:
name = 'SetPositions'
title = 'Set positions'
positions = {}

View File

@ -0,0 +1,9 @@
class TransactionModel:
name = 'Transaction'
title = 'Running transaction'
def __init__(self, operations, http_verb, req_path, **req_kwargs):
self.operations = operations
self.http_verb = http_verb
self.req_path = req_path
self.req_kwargs = req_kwargs

View File

@ -0,0 +1,43 @@
from .AddUserModel import AddUserModel
from .RenewUserModel import RenewUserModel
from .GetUserModel import GetUserModel
from .ResetPasswordModel import ResetPasswordModel
from .ChangeLoginShellModel import ChangeLoginShellModel
from .AddGroupModel import AddGroupModel
from .GetGroupModel import GetGroupModel
from .AddMemberToGroupModel import AddMemberToGroupModel
from .RemoveMemberFromGroupModel import RemoveMemberFromGroupModel
from .CreateDatabaseModel import CreateDatabaseModel
from .ResetDatabasePasswordModel import ResetDatabasePasswordModel
from .GetPositionsModel import GetPositionsModel
from .SetPositionsModel import SetPositionsModel
class WelcomeModel:
name = 'Welcome'
title = 'CSC Electronic Office'
def __init__(self):
self.categories = {
'Members': [
AddUserModel,
RenewUserModel,
GetUserModel,
ResetPasswordModel,
ChangeLoginShellModel,
],
'Groups': [
AddGroupModel,
GetGroupModel,
AddMemberToGroupModel,
RemoveMemberFromGroupModel,
],
'Databases': [
CreateDatabaseModel,
ResetDatabasePasswordModel,
],
'Positions': [
GetPositionsModel,
SetPositionsModel,
],
}

View File

@ -0,0 +1,15 @@
from .WelcomeModel import WelcomeModel
from .AddUserModel import AddUserModel
from .RenewUserModel import RenewUserModel
from .GetUserModel import GetUserModel
from .ResetPasswordModel import ResetPasswordModel
from .ChangeLoginShellModel import ChangeLoginShellModel
from .AddGroupModel import AddGroupModel
from .GetGroupModel import GetGroupModel
from .AddMemberToGroupModel import AddMemberToGroupModel
from .RemoveMemberFromGroupModel import RemoveMemberFromGroupModel
from .CreateDatabaseModel import CreateDatabaseModel
from .ResetDatabasePasswordModel import ResetDatabasePasswordModel
from .GetPositionsModel import GetPositionsModel
from .SetPositionsModel import SetPositionsModel
from .TransactionModel import TransactionModel

View File

@ -1,80 +0,0 @@
from threading import Thread
from asciimatics.widgets import Layout, Label
from zope import component
from ...utils import http_get
from ..CeoFrame import CeoFrame
from ceo_common.interfaces import IConfig
position_names = {
'president': "President",
'vice-president': "Vice President",
'treasurer': "Treasurer",
'secretary': "Secretary",
'sysadmin': "Sysadmin",
'cro': "Chief Returning Officer",
'librarian': "Librarian",
'imapd': "IMAPD",
'webmaster': "Web Master",
'offsck': "Office Manager",
}
class GetPositionsView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'GetPositions',
escape_on_q=True,
on_load=self._on_load
)
self._position_widgets = {}
def _on_load(self):
cfg = component.getUtility(IConfig)
avail = cfg.get('positions_available')
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Label(''))
self._main_layout = Layout([10, 1, 10], fill_frame=True)
self.add_layout(self._main_layout)
for pos in avail:
self._position_widgets[pos] = self._add_pair(position_names[pos], '')
self.add_flash_message_layout()
self.add_buttons(back_btn=True)
self.fix()
def target():
self.flash_message('Looking up positions...')
try:
resp = http_get('/api/positions')
if not resp.ok:
return
positions = resp.json()
for pos, username in positions.items():
self._position_widgets[pos].text = username
finally:
self.clear_flash_message(force_update=True)
Thread(target=target).start()
def _add_blank_line(self):
self._main_layout.add_widget(Label(' ', 0))
self._main_layout.add_widget(Label(' ', 2))
def _add_pair(self, key: str, val: str):
key_widget = Label(key + ':', align='>')
value_widget = Label(val, align='<')
self._main_layout.add_widget(key_widget, 0)
self._main_layout.add_widget(value_widget, 2)
return value_widget
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
# clear the labels
for widget in self._position_widgets.values():
widget.text = ''

View File

@ -1,76 +0,0 @@
from threading import Thread
from asciimatics.widgets import Layout, Label, Text
from zope import component
from ...utils import defer, http_post, http_get
from ..CeoFrame import CeoFrame
from .GetPositionsView import position_names
from ceo_common.interfaces import IConfig
from ceod.transactions.members.UpdateMemberPositionsTransaction import UpdateMemberPositionsTransaction
class SetPositionsView(CeoFrame):
"""
Reset the positions to the currently set positions
"""
def reset_positions(self):
def target():
self.flash_message('Looking up positions...')
try:
resp = http_get('/api/positions')
if not resp.ok:
return
positions = resp.json()
for pos, username in positions.items():
self._widgets[pos].value = username
finally:
self.clear_flash_message(force_update=True)
Thread(target=target).start()
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'SetPositions',
)
cfg = component.getUtility(IConfig)
avail = cfg.get('positions_available')
required = cfg.get('positions_required')
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._widgets = {}
for pos in avail:
suffix = ' (*)' if pos in required else ''
widget = Text(position_names[pos] + suffix, pos)
self._widgets[pos] = widget
layout.add_widget(widget)
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Label('(*) Required'))
self.add_flash_message_layout()
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.reset_positions()
self.fix()
def _ceoframe_on_reset(self):
super()._ceoframe_on_reset()
self.reset_positions()
def _next(self):
self.save()
body = {pos: username for pos, username in self.data.items() if username}
self._model.deferred_req = defer(http_post, '/api/positions', json=body)
self._model.operations = UpdateMemberPositionsTransaction.operations
self._model.confirm_lines = [
"The positions will be updated as follows:",
'',
*self.data.items(),
'',
'Are you sure you want to continue?',
]

View File

@ -1,2 +0,0 @@
from .GetPositionsView import GetPositionsView
from .SetPositionsView import SetPositionsView

View File

@ -1,89 +1,42 @@
import os
import sys
import urwid
from asciimatics.exceptions import ResizeScreenError
from asciimatics.scene import Scene
from asciimatics.screen import Screen
from .ConfirmView import ConfirmView
from .ErrorView import ErrorView
from .Model import Model
from .ResultView import ResultView
from .TransactionView import TransactionView
from .WelcomeView import WelcomeView
from .databases.CreateDatabaseView import CreateDatabaseView
from .databases.CreateDatabaseResultView import CreateDatabaseResultView
from .databases.ResetDatabasePasswordView import ResetDatabasePasswordView
from .databases.ResetDatabasePasswordResultView import ResetDatabasePasswordResultView
from .groups.AddGroupView import AddGroupView
from .groups.AddMemberToGroupView import AddMemberToGroupView
from .groups.GetGroupView import GetGroupView
from .groups.GetGroupResultView import GetGroupResultView
from .groups.RemoveMemberFromGroupView import RemoveMemberFromGroupView
from .members.AddUserTransactionView import AddUserTransactionView
from .members.AddUserView import AddUserView
from .members.ChangeLoginShellView import ChangeLoginShellView
from .members.GetUserView import GetUserView
from .members.GetUserResultView import GetUserResultView
from .members.RenewUserView import RenewUserView
from .members.ResetPasswordView import ResetPasswordView
from .members.ResetPasswordResultView import ResetPasswordResultView
from .members.SetForwardingAddressesView import SetForwardingAddressesView
from .positions import GetPositionsView, SetPositionsView
from .app import App
from .utils import get_mvc
# tuples of (name, view)
views = []
def screen_wrapper(screen, last_scene, model):
global views
width = min(screen.width, 90)
height = min(screen.height, 24)
views = [
('Welcome', WelcomeView(screen, width, height, model)),
('Confirm', ConfirmView(screen, width, height, model)),
('Transaction', TransactionView(screen, width, height, model)),
('Result', ResultView(screen, width, height, model)),
('Error', ErrorView(screen, width, height, model)),
('AddUser', AddUserView(screen, width, height, model)),
('AddUserTransaction', AddUserTransactionView(screen, width, height, model)),
('RenewUser', RenewUserView(screen, width, height, model)),
('GetUser', GetUserView(screen, width, height, model)),
('GetUserResult', GetUserResultView(screen, width, height, model)),
('ResetPassword', ResetPasswordView(screen, width, height, model)),
('ResetPasswordResult', ResetPasswordResultView(screen, width, height, model)),
('ChangeLoginShell', ChangeLoginShellView(screen, width, height, model)),
('SetForwardingAddresses', SetForwardingAddressesView(screen, width, height, model)),
('AddGroup', AddGroupView(screen, width, height, model)),
('GetGroup', GetGroupView(screen, width, height, model)),
('GetGroupResult', GetGroupResultView(screen, width, height, model)),
('AddMemberToGroup', AddMemberToGroupView(screen, width, height, model)),
('RemoveMemberFromGroup', RemoveMemberFromGroupView(screen, width, height, model)),
('CreateDatabase', CreateDatabaseView(screen, width, height, model)),
('CreateDatabaseResult', CreateDatabaseResultView(screen, width, height, model)),
('ResetDatabasePassword', ResetDatabasePasswordView(screen, width, height, model)),
('ResetDatabasePasswordResult', ResetDatabasePasswordResultView(screen, width, height, model)),
('GetPositions', GetPositionsView(screen, width, height, model)),
('SetPositions', SetPositionsView(screen, width, height, model)),
]
scenes = [
Scene([view], -1, name=name) for name, view in views
]
model.screen = screen
model.views = [view for name, view in views]
screen.play(
scenes, stop_on_resize=True, start_scene=last_scene, allow_int=True,
)
def exit_on_special_chars(key):
if key in ('q', 'Q', 'esc'):
raise urwid.ExitMainLoop()
def main():
last_scene = None
model = Model()
try:
Screen.wrapper(screen_wrapper, arguments=[last_scene, model])
sys.exit(0)
except ResizeScreenError:
os.system('reset')
print('Unfortunately, ceo does not currently support dynamic resizing.')
sys.exit(1)
# Just put some empty placeholder in the main widget for now
# (will be replaced by the WelcomeView)
main_widget = urwid.Padding(urwid.Text(''), left=2, right=2)
top = urwid.Overlay(
main_widget,
urwid.AttrMap(urwid.SolidFill(' '), 'background'),
align='center',
width=('relative', App.REL_WIDTH_PCT),
valign='middle',
height=('relative', App.REL_HEIGHT_PCT),
min_width=App.WIDTH,
min_height=App.HEIGHT,
)
loop = urwid.MainLoop(
top,
palette=[
('reversed', 'standout', ''),
('bold', 'bold', ''),
('green', 'light green', ''),
('red', 'light red', ''),
('background', 'standout,light cyan', ''),
],
# Disable the mouse (makes it hard to copy text from the screen)
handle_mouse=False,
unhandled_input=exit_on_special_chars
)
app = App(loop, main_widget)
_, view, _ = get_mvc(app, 'Welcome')
view.activate()
loop.run()

View File

@ -1,10 +1,88 @@
from asciimatics.exceptions import NextScene
import json
import requests
from ceo.tui.controllers import *
from ceo.tui.models import *
from ceo.tui.views import *
def handle_sync_response(resp: requests.Response, model):
if resp.status_code != 200:
model.error_message = resp.text.rstrip()
raise NextScene('Error')
return resp.json()
def handle_sync_response(resp, controller):
if resp.ok:
if resp.headers.get('content-type') == 'application/json':
return resp.json()
# streaming response
return [json.loads(line) for line in resp.text.splitlines()]
def target():
view = ErrorView(controller.model, controller, controller.app)
controller.switch_to_view(view)
if resp.headers.get('content-type') == 'application/json':
err_msg = resp.json()['error']
else:
err_msg = resp.text.rstrip()
controller.model.error_message = err_msg
controller.app.run_in_main_loop(target)
raise Controller.RequestFailed()
def get_mvc(app, name):
if name == WelcomeModel.name:
model = WelcomeModel()
controller = WelcomeController(model, app)
view = WelcomeView(model, controller, app)
elif name == AddUserModel.name:
model = AddUserModel()
controller = AddUserController(model, app)
view = AddUserView(model, controller, app)
elif name == RenewUserModel.name:
model = RenewUserModel()
controller = RenewUserController(model, app)
view = RenewUserView(model, controller, app)
elif name == GetUserModel.name:
model = GetUserModel()
controller = GetUserController(model, app)
view = GetUserView(model, controller, app)
elif name == ResetPasswordModel.name:
model = ResetPasswordModel()
controller = ResetPasswordController(model, app)
view = ResetPasswordView(model, controller, app)
elif name == ChangeLoginShellModel.name:
model = ChangeLoginShellModel()
controller = ChangeLoginShellController(model, app)
view = ChangeLoginShellView(model, controller, app)
elif name == AddGroupModel.name:
model = AddGroupModel()
controller = AddGroupController(model, app)
view = AddGroupView(model, controller, app)
elif name == GetGroupModel.name:
model = GetGroupModel()
controller = GetGroupController(model, app)
view = GetGroupView(model, controller, app)
elif name == AddMemberToGroupModel.name:
model = AddMemberToGroupModel()
controller = AddMemberToGroupController(model, app)
view = AddMemberToGroupView(model, controller, app)
elif name == RemoveMemberFromGroupModel.name:
model = RemoveMemberFromGroupModel()
controller = RemoveMemberFromGroupController(model, app)
view = RemoveMemberFromGroupView(model, controller, app)
elif name == CreateDatabaseModel.name:
model = CreateDatabaseModel()
controller = CreateDatabaseController(model, app)
view = CreateDatabaseView(model, controller, app)
elif name == ResetDatabasePasswordModel.name:
model = ResetDatabasePasswordModel()
controller = ResetDatabasePasswordController(model, app)
view = ResetDatabasePasswordView(model, controller, app)
elif name == GetPositionsModel.name:
model = GetPositionsModel()
controller = GetPositionsController(model, app)
view = GetPositionsView(model, controller, app)
elif name == SetPositionsModel.name:
model = SetPositionsModel()
controller = SetPositionsController(model, app)
view = SetPositionsView(model, controller, app)
else:
raise NotImplementedError()
controller.view = view
return model, view, controller

View File

@ -0,0 +1,10 @@
from .ConfirmationView import ConfirmationView
class AddGroupConfirmationView(ConfirmationView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
lines = [
f"A new group '{self.model.name}' will be created."
]
self.set_lines(lines)

View File

@ -0,0 +1,21 @@
import urwid
from .ColumnView import ColumnView
class AddGroupView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
self.name_edit = urwid.Edit()
self.description_edit = urwid.Edit()
rows = [
(
urwid.Text('Name:', align='right'),
self.name_edit
),
(
urwid.Text('Description:', align='right'),
self.description_edit
)
]
self.set_rows(rows)

View File

@ -0,0 +1,10 @@
from .ConfirmationView import ConfirmationView
class AddMemberToGroupConfirmationView(ConfirmationView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
lines = [
f"User '{self.model.username}' will be added to the group '{self.model.name}'."
]
self.set_lines(lines)

View File

@ -0,0 +1,33 @@
import urwid
from .ColumnView import ColumnView
class AddMemberToGroupView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
self.name_edit = urwid.Edit()
self.username_edit = urwid.Edit()
rows = [
(
urwid.Text('Group name:', align='right'),
self.name_edit
),
(
urwid.Text('New group member:', align='right'),
self.username_edit
)
]
checkbox = urwid.CheckBox(
'Subscribe to auxiliary mailing lists',
state=True,
on_state_change=self.controller.on_list_subscribe_checkbox_change
)
# This is necessary to place the checkbox in the center of the page
# (urwid.Padding doesn't seem to have an effect on it)
checkbox = urwid.Columns([
('weight', 1, urwid.Text('')),
('weight', 3, checkbox)
])
extra_widgets = [urwid.Divider(), checkbox]
self.set_rows(rows, extra_widgets=extra_widgets)

View File

@ -0,0 +1,12 @@
from .ConfirmationView import ConfirmationView
class AddUserConfirmationView(ConfirmationView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
lines = ['Please make sure that:', '']
if self.model.membership_type == 'general_member':
lines.append(f'\N{BULLET} The new member has paid ${self.model.num_terms * 2} in club fees')
lines.append("\N{BULLET} You have verified the name on the new member's WatCard")
lines.append("\N{BULLET} The new member has signed the machine usage agreement")
self.set_lines(lines, align='left')

View File

@ -0,0 +1,74 @@
import urwid
from .ColumnView import ColumnView
class AddUserView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
membership_types_group = []
self.username_edit = urwid.Edit()
self.full_name_edit = urwid.Edit()
self.first_name_edit = urwid.Edit()
self.last_name_edit = urwid.Edit()
self.program_edit = urwid.Edit()
self.forwarding_address_edit = urwid.Edit()
self.num_terms_edit = urwid.IntEdit(default=1)
rows = [
(
urwid.Text('Membership type:', align='right'),
urwid.RadioButton(
membership_types_group,
'General membership ($2)',
on_state_change=self.controller.on_membership_type_changed,
user_data='general_member'
)
),
(
urwid.Divider(),
urwid.RadioButton(
membership_types_group,
'Club rep (free)',
on_state_change=self.controller.on_membership_type_changed,
user_data='club_rep'
)
),
(
urwid.Text('Username:', align='right'),
self.username_edit
),
(
urwid.Text('Full name:', align='right'),
self.full_name_edit
),
(
urwid.Text('First name:', align='right'),
self.first_name_edit
),
(
urwid.Text('Last name:', align='right'),
self.last_name_edit
),
(
urwid.Text('Program:', align='right'),
self.program_edit
),
(
urwid.Text('Number of terms:', align='right'),
self.num_terms_edit
),
]
self.set_rows(
rows,
# We want to know when the username field loses focus
notify_when_focus_changes=True,
right_col_weight=2
)
def update_fields(self):
self.full_name_edit.edit_text = self.model.full_name
self.first_name_edit.edit_text = self.model.first_name
self.last_name_edit.edit_text = self.model.last_name
self.program_edit.edit_text = self.model.program
self.forwarding_address_edit.edit_text = self.model.forwarding_address
self.num_terms_edit.edit_text = str(self.model.num_terms)

View File

@ -0,0 +1,10 @@
from .ConfirmationView import ConfirmationView
class ChangeLoginShellConfirmationView(ConfirmationView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
lines = [
f"{self.model.username}'s login shell will be set to {self.model.login_shell}."
]
self.set_lines(lines)

View File

@ -0,0 +1,27 @@
import urwid
from .ColumnView import ColumnView
class ChangeLoginShellView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
self.username_edit = urwid.Edit()
self.login_shell_edit = urwid.Edit()
rows = [
(
urwid.Text('Username:', align='right'),
self.username_edit
),
(
urwid.Text('Login shell:', align='right'),
self.login_shell_edit
)
]
self.set_rows(
rows,
notify_when_focus_changes=True
)
def update_fields(self):
self.login_shell_edit.edit_text = self.model.login_shell

View File

@ -0,0 +1,32 @@
import urwid
from .ColumnView import ColumnView
class ColumnResponseView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
def set_pairs(self, pairs, right_col_weight=1):
for i, (left, right) in enumerate(pairs):
if type(right) is list:
pairs[i] = (left, ','.join(map(str, right)))
else:
pairs[i] = (left, str(right))
rows = [
(
urwid.Text(
left + ':' if left != '' else '',
align='right'
),
urwid.Text(right)
)
for left, right in pairs
]
self.set_rows(
rows,
right_col_weight=right_col_weight,
disable_cols=True,
no_back_button=True,
on_next=self.controller.get_next_menu_callback('Welcome')
)

View File

@ -0,0 +1,59 @@
import urwid
from .View import View
from .utils import wrap_in_frame
class ColumnView(View):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
def set_rows(
self,
rows,
right_col_weight=1,
notify_when_focus_changes=False,
disable_cols=False,
extra_widgets=None,
no_back_button=False,
on_next=None,
no_next_button=False,
):
# Each item in the list is two columns
columns_list = [
urwid.Columns(
[('weight', 1, left), ('weight', right_col_weight, right)],
dividechars=3,
focus_column=1
)
for left, right in rows
]
if extra_widgets is not None:
columns_list.extend(extra_widgets)
listwalker = urwid.SimpleFocusListWalker(columns_list)
if notify_when_focus_changes:
# See https://stackoverflow.com/a/43125172
urwid.connect_signal(
listwalker, 'modified',
self.controller.on_row_focus_changed
)
# Keep a reference for the controller
self.listwalker = listwalker
cols = urwid.ListBox(listwalker)
if disable_cols:
cols = urwid.WidgetDisable(cols)
self.flash_text = urwid.Text('')
if no_back_button:
on_back = None
else:
on_back = self.controller.prev_menu_callback
if on_next is None and not no_next_button:
on_next = self.controller.on_next_button_pressed
body = cols
self.original_widget = wrap_in_frame(
body,
self.model.title,
on_back=on_back,
on_next=on_next,
flash_text=self.flash_text,
)

View File

@ -0,0 +1,18 @@
import urwid
from .PlainTextView import PlainTextView
class ConfirmationView(PlainTextView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
self.flash_text = urwid.Text('')
def set_lines(self, lines, align='center'):
super().set_lines(
lines,
align=align,
on_back=self.controller.prev_menu_callback,
on_next=self.controller.on_confirmation_button_pressed,
flash_text=self.flash_text,
)

View File

@ -0,0 +1,14 @@
from .ConfirmationView import ConfirmationView
import ceo.krb_check as krb
class CreateDatabaseConfirmationView(ConfirmationView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
db_type = self.model.db_type
db_type_name = 'MySQL' if db_type == 'mysql' else 'PostgreSQL'
username = krb.get_username()
lines = [
f"A new {db_type_name} database will be created for {username}."
]
self.set_lines(lines)

View File

@ -0,0 +1,33 @@
from .PlainTextView import PlainTextView
class CreateDatabaseResponseView(PlainTextView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
def activate(self):
self.controller.write_db_creds_to_file()
username = self.model.user_dict['uid']
password = self.model.password
db_host = self.model.db_host
filename = self.model.filename
wrote_to_file = self.model.wrote_to_file
lines = [
'Connection information:',
'',
f'Database: {username}',
f'Username: {username}',
f'Password: {password}',
f'Host: {db_host}',
''
]
if wrote_to_file:
lines.append(f"These settings have been written to {filename}.")
else:
lines.append(f"We were unable to write these settings to {filename}.")
self.set_lines(
lines,
align='left',
on_next=self.controller.get_next_menu_callback('Welcome'),
)
super().activate()

View File

@ -0,0 +1,30 @@
import urwid
from .ColumnView import ColumnView
class CreateDatabaseView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
db_types_group = []
rows = [
(
urwid.Text('Database type:', align='right'),
urwid.RadioButton(
db_types_group,
'MySQL',
on_state_change=self.controller.on_db_type_changed,
user_data='mysql'
)
),
(
urwid.Divider(),
urwid.RadioButton(
db_types_group,
'PostgreSQL',
on_state_change=self.controller.on_db_type_changed,
user_data='postgresql'
)
),
]
self.set_rows(rows)

View File

@ -0,0 +1,15 @@
from .PlainTextView import PlainTextView
class ErrorView(PlainTextView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
lines = [
'An error occurred:',
'',
*model.error_message.split('\n')
]
self.set_lines(
lines,
on_next=self.controller.get_next_menu_callback('Welcome'),
)

View File

@ -0,0 +1,24 @@
from .PlainTextView import PlainTextView
class GetGroupResponseView(PlainTextView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
d = self.model.resp_json
if 'description' in d:
desc = d['description'] + ' (' + d['cn'] + ')'
else:
desc = d['cn']
lines = [
'Members of ' + desc + ':',
''
]
lines.extend([
member['cn'] + ' (' + member['uid'] + ')'
for member in self.model.resp_json['members']
])
self.set_lines(
lines,
scrollable=True,
on_next=self.controller.get_next_menu_callback('Welcome'),
)

View File

@ -0,0 +1,16 @@
import urwid
from .ColumnView import ColumnView
class GetGroupView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
self.name_edit = urwid.Edit()
rows = [
(
urwid.Text('Group name:', align='right'),
self.name_edit
)
]
self.set_rows(rows)

View File

@ -0,0 +1,30 @@
from zope import component
import urwid
from .ColumnView import ColumnView
from .position_names import position_names
from ceo_common.interfaces import IConfig
class GetPositionsView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
self.position_fields = {}
cfg = component.getUtility(IConfig)
avail = cfg.get('positions_available')
rows = []
for pos in avail:
name = position_names[pos]
field = urwid.Text('...')
self.position_fields[pos] = field
self.model.positions[pos] = ''
rows.append((urwid.Text(name, align='right'), field))
self.set_rows(rows, disable_cols=True, no_next_button=True)
def activate(self):
self.controller.lookup_positions_async()
super().activate()
def update_fields(self):
for pos, field in self.position_fields.items():
field.set_text(self.model.positions[pos])

View File

@ -0,0 +1,31 @@
from ...utils import user_dict_kv
from .ColumnResponseView import ColumnResponseView
class GetUserResponseView(ColumnResponseView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
d = self.model.resp_json.copy()
# We don't have a lot of vertical space, so it's best to
# omit unnecessary fields
cols_to_omit = [
'given_name',
'sn',
'is_club',
'home_directory',
]
for key in cols_to_omit:
del d[key]
pairs = user_dict_kv(d)
num_terms = max(map(len, [
d.get('terms', []),
d.get('non_member_terms', [])
]))
if num_terms < 6:
right_col_weight = 1
elif num_terms < 12:
right_col_weight = 2
else:
right_col_weight = 3
self.set_pairs(pairs, right_col_weight=right_col_weight)

View File

@ -0,0 +1,16 @@
import urwid
from .ColumnView import ColumnView
class GetUserView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
self.username_edit = urwid.Edit()
rows = [
(
urwid.Text('Username:', align='right'),
self.username_edit
)
]
self.set_rows(rows)

View File

@ -0,0 +1,47 @@
import urwid
from .View import View
from .utils import wrap_in_frame
from ceo.tui.app import App
class PlainTextView(View):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
def set_lines(
self,
lines,
align='center',
scrollable=False,
min_width=None,
on_back=None,
on_next=None,
flash_text=None,
):
if min_width is None:
if align == 'center':
min_width = App.WIDTH
else:
min_width = max(map(len, lines))
if scrollable:
body = urwid.ListBox(urwid.SimpleListWalker([
urwid.Text(line, align=align)
for line in lines
]))
else:
body = urwid.Filler(
urwid.Text('\n'.join(lines), align=align)
)
self.original_widget = wrap_in_frame(
urwid.Padding(
body,
align='center',
width=('relative', App.REL_WIDTH_PCT),
min_width=min_width,
),
self.model.title,
on_back=on_back,
on_next=on_next,
flash_text=flash_text,
)

View File

@ -0,0 +1,10 @@
from .ConfirmationView import ConfirmationView
class RemoveMemberFromGroupConfirmationView(ConfirmationView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
lines = [
f"User '{self.model.username}' will be removed from the group '{self.model.name}'."
]
self.set_lines(lines)

View File

@ -0,0 +1,33 @@
import urwid
from .ColumnView import ColumnView
class RemoveMemberFromGroupView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
self.name_edit = urwid.Edit()
self.username_edit = urwid.Edit()
rows = [
(
urwid.Text('Group name:', align='right'),
self.name_edit
),
(
urwid.Text('Member to remove:', align='right'),
self.username_edit
)
]
checkbox = urwid.CheckBox(
'Unsubscribe from auxiliary mailing lists',
state=True,
on_state_change=self.controller.on_list_unsubscribe_checkbox_change
)
# This is necessary to place the checkbox in the center of the page
# (urwid.Padding doesn't seem to have an effect on it)
checkbox = urwid.Columns([
('weight', 1, urwid.Text('')),
('weight', 3, checkbox)
])
extra_widgets = [urwid.Divider(), checkbox]
self.set_rows(rows, extra_widgets=extra_widgets)

View File

@ -0,0 +1,18 @@
from .ConfirmationView import ConfirmationView
class RenewUserConfirmationView(ConfirmationView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
membership_str = 'member'
if model.membership_type == 'club_rep':
membership_str = 'non-member'
lines = [
f"{model.username} will be renewed for the following {membership_str} terms:",
'',
', '.join(self.model.new_terms)
]
if model.membership_type == 'general_member':
lines.append('')
lines.append(f'Please make sure they have paid ${model.num_terms * 2} in club fees.')
self.set_lines(lines)

View File

@ -0,0 +1,40 @@
import urwid
from .ColumnView import ColumnView
class RenewUserView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
membership_types_group = []
self.username_edit = urwid.Edit()
self.num_terms_edit = urwid.IntEdit(default=1)
rows = [
(
urwid.Text('Membership type:', align='right'),
urwid.RadioButton(
membership_types_group,
'General membership ($2)',
on_state_change=self.controller.on_membership_type_changed,
user_data='general_member'
)
),
(
urwid.Divider(),
urwid.RadioButton(
membership_types_group,
'Club rep (free)',
on_state_change=self.controller.on_membership_type_changed,
user_data='club_rep'
)
),
(
urwid.Text('Username:', align='right'),
self.username_edit
),
(
urwid.Text('Number of terms:', align='right'),
self.num_terms_edit
),
]
self.set_rows(rows, right_col_weight=2)

View File

@ -0,0 +1,14 @@
from .ConfirmationView import ConfirmationView
import ceo.krb_check as krb
class ResetDatabasePasswordConfirmationView(ConfirmationView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
db_type = self.model.db_type
db_type_name = 'MySQL' if db_type == 'mysql' else 'PostgreSQL'
username = krb.get_username()
lines = [
f"The {db_type_name} password for {username} will be reset."
]
self.set_lines(lines)

View File

@ -0,0 +1,31 @@
from .PlainTextView import PlainTextView
class ResetDatabasePasswordResponseView(PlainTextView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
def activate(self):
self.controller.write_db_creds_to_file()
db_type = self.model.db_type
db_type_name = 'MySQL' if db_type == 'mysql' else 'PostgreSQL'
username = self.model.user_dict['uid']
password = self.model.password
filename = self.model.filename
wrote_to_file = self.model.wrote_to_file
lines = [
f'The new {db_type_name} password for {username} is:'
'',
password,
''
]
if wrote_to_file:
lines.append(f"The settings in {filename} have been updated.")
else:
lines.append(f"We were unable to update the settings in {filename}.")
self.set_lines(
lines,
align='left',
on_next=self.controller.get_next_menu_callback('Welcome'),
)
super().activate()

View File

@ -0,0 +1,30 @@
import urwid
from .ColumnView import ColumnView
class ResetDatabasePasswordView(ColumnView):
def __init__(self, model, controller, app):
super().__init__(model, controller, app)
db_types_group = []
rows = [
(
urwid.Text('Database type:', align='right'),
urwid.RadioButton(
db_types_group,
'MySQL',
on_state_change=self.controller.on_db_type_changed,
user_data='mysql'
)
),
(
urwid.Divider(),
urwid.RadioButton(
db_types_group,
'PostgreSQL',
on_state_change=self.controller.on_db_type_changed,
user_data='postgresql'
)
),
]
self.set_rows(rows)

Some files were not shown because too many files have changed in this diff Show More