implement membership renewals in TUI

This commit is contained in:
Max Erenberg 2021-09-06 16:40:05 +00:00
parent cce920d6ba
commit ee21873ad7
15 changed files with 369 additions and 120 deletions

View File

@ -4,13 +4,12 @@ from typing import Dict
import click
from zope import component
from ..term_utils import get_terms_for_new_user, get_terms_for_renewal
from ..utils import http_post, http_get, http_patch, http_delete, \
get_failed_operations, get_terms_for_new_user, user_dict_lines, \
get_adduser_operations
get_failed_operations, user_dict_lines, get_adduser_operations
from .utils import handle_stream_response, handle_sync_response, print_lines, \
check_if_in_development
from ceo_common.interfaces import IConfig
from ceo_common.model import Term
from ceod.transactions.members import DeleteMemberTransaction
@ -24,7 +23,7 @@ def members():
@click.option('--cn', help='Full name', prompt='Full name')
@click.option('--program', required=False, help='Academic program')
@click.option('--terms', 'num_terms', type=click.IntRange(1, 100),
help='Number of terms to add', prompt='Number of terms')
help='Number of terms to add', default=1)
@click.option('--clubrep', is_flag=True, default=False,
help='Add non-member terms instead of member terms')
@click.option('--forwarding-address', required=False,
@ -133,22 +132,7 @@ def modify(username, login_shell, forwarding_addresses):
@click.option('--clubrep', is_flag=True, default=False,
help='Add non-member terms instead of member terms')
def renew(username, num_terms, clubrep):
resp = http_get('/api/members/' + username)
result = handle_sync_response(resp)
max_term = None
current_term = Term.current()
if clubrep and 'non_member_terms' in result:
max_term = max(Term(s) for s in result['non_member_terms'])
elif not clubrep and 'terms' in result:
max_term = max(Term(s) for s in result['terms'])
if max_term is not None and max_term >= current_term:
next_term = max_term + 1
else:
next_term = Term.current()
terms = [next_term + i for i in range(num_terms)]
terms = list(map(str, terms))
terms = get_terms_for_renewal(username, num_terms, clubrep)
if clubrep:
body = {'non_member_terms': terms}

View File

@ -1,12 +1,9 @@
import json
import socket
import sys
from typing import List, Tuple, Dict
import click
import requests
from ..operation_strings import descriptions as op_desc
from ..utils import space_colon_kv, generic_handle_stream_response
from .CLIStreamResponseHandler import CLIStreamResponseHandler

38
ceo/term_utils.py Normal file
View File

@ -0,0 +1,38 @@
from typing import List
from .utils import http_get
from ceo_common.model import Term
import ceo.cli.utils as cli_utils
import ceo.tui.utils as tui_utils
# Had to put these in a separate file to avoid a circular import.
def get_terms_for_new_user(num_terms: int) -> List[str]:
current_term = Term.current()
terms = [current_term + i for i in range(num_terms)]
return list(map(str, terms))
def get_terms_for_renewal(
username: str, num_terms: int, clubrep: bool, tui_model=None,
) -> List[str]:
resp = http_get('/api/members/' + username)
if tui_model is None:
result = cli_utils.handle_sync_response(resp)
else:
result = tui_utils.handle_sync_response(resp, tui_model)
max_term = None
current_term = Term.current()
if clubrep and 'non_member_terms' in result:
max_term = max(Term(s) for s in result['non_member_terms'])
elif not clubrep and 'terms' in result:
max_term = max(Term(s) for s in result['terms'])
if max_term is not None and max_term >= current_term:
next_term = max_term + 1
else:
next_term = Term.current()
terms = [next_term + i for i in range(num_terms)]
return list(map(str, terms))

92
ceo/tui/CeoFrame.py Normal file
View File

@ -0,0 +1,92 @@
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Frame, Layout, Divider, Button
class CeoFrame(Frame):
def __init__(
self,
screen,
height,
width,
model,
name, # key in model.viewdata
on_load=None,
title=None,
save_data=False, # whether to save widget state for resizing
):
super().__init__(
screen,
height,
width,
name=name,
can_scroll=False,
title=title,
on_load=self._ceoframe_on_load,
)
self._save_data = save_data
self._extra_on_load = on_load
self._model = model
self._name = name
self._loaded = False
def _ceoframe_on_load(self):
# We usually don't want _on_load() to be called multiple times
# e.g. when switching back to a scene
if self._loaded:
return
self._loaded = True
if self._model.title is not None:
self.title = self._model.title
self._model.title = None
if self._save_data:
# restore the saved input fields' values
self.data = self._model.viewdata[self._name]
if self._extra_on_load is not None:
self._extra_on_load()
def _on_unload(self):
if not self._save_data:
return
# save the input fields' values so that they don't disappear when
# the window gets resized
self.save()
self._model.viewdata[self._name] = self.data
def add_buttons(
self, back_btn=False, back_btn_text='Back',
next_scene=None, next_scene_text='Next', on_next=None,
on_next_excl=None,
):
"""
Add a new layout at the bottom of the frame with buttons.
If back_btn is True, a Back button is added.
If next_scene is set to the name of the next scene, or on_next_excl
is set, a Next button will be added.
If on_next is set to a function, it will be called when the Next
button is pressed, and the screen will switch to the next scene.
If on_next_excl is set to a function, it will be called when the Next
button is pressed, and the scene will not be switched.
If both on_next and on_next_excl are set, on_next will be ignored.
"""
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Divider())
def _back():
raise NextScene(self._model.scene_stack.pop())
def _next():
if on_next_excl is not None:
on_next_excl()
return
if on_next is not None:
on_next()
self._model.scene_stack.append(self._name)
raise NextScene(next_scene)
layout = Layout([1, 1])
self.add_layout(layout)
if back_btn:
layout.add_widget(Button(back_btn_text, _back), 0)
if next_scene is not None or on_next_excl is not None:
layout.add_widget(Button(next_scene_text, _next), 1)

View File

@ -1,28 +1,14 @@
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Frame, Layout, Button, Divider, Label
from asciimatics.widgets import Layout, Label
from .CeoFrame import CeoFrame
class ConfirmView(Frame):
class ConfirmView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen,
height,
width,
can_scroll=False,
on_load=self._on_load,
title='Confirmation',
screen, height, width, model, 'Confirm',
on_load=self._confirmview_on_load, title='Confirmation',
)
self._model = model
def _add_buttons(self):
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Divider())
layout = Layout([1, 1])
self.add_layout(layout)
layout.add_widget(Button('No', self._back), 0)
layout.add_widget(Button('Yes', self._next), 1)
def _add_line(self, text: str = ''):
layout = Layout([100])
@ -35,7 +21,7 @@ class ConfirmView(Frame):
layout.add_widget(Label(key + ':', align='>'), 0)
layout.add_widget(Label(val, align='<'), 2)
def _on_load(self):
def _confirmview_on_load(self):
for _ in range(2):
self._add_line()
for line in self._model.confirm_lines:
@ -48,12 +34,11 @@ class ConfirmView(Frame):
# fill the rest of the space
self.add_layout(Layout([100], fill_frame=True))
self._add_buttons()
if self._model.operations is not None:
next_scene = 'Transaction'
else:
next_scene = 'Result'
self.add_buttons(
back_btn=True, back_btn_text='No',
next_scene=next_scene, next_scene_text='Yes')
self.fix()
def _back(self):
raise NextScene(self._model.scene_stack.pop())
def _next(self):
self._model.scene_stack.append('Confirm')
raise NextScene('Transaction')

29
ceo/tui/ErrorView.py Normal file
View File

@ -0,0 +1,29 @@
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Layout, Label
from .CeoFrame import CeoFrame
class ErrorView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Error',
on_load=self._errorview_on_load, title='Error',
)
def _errorview_on_load(self):
layout = Layout([1, 10], fill_frame=True)
self.add_layout(layout)
for _ in range(2):
layout.add_widget(Label(''), 1)
layout.add_widget(Label('An error occurred:'), 1)
layout.add_widget(Label(''), 1)
for line in self._model.error_message.splitlines():
layout.add_widget(Label(line), 1)
self.add_buttons(on_next_excl=self._next)
self.fix()
def _next(self):
self._model.reset()
raise NextScene('Welcome')

View File

@ -1,35 +1,48 @@
from copy import deepcopy
class Model:
"""A convenient place to store View data persistently."""
def __init__(self):
# simple key-value pairs
self.screen = None
self.title = None
self.scene_stack = []
self.deferred_req = None
self.error_message = None
# view-specific data, to be used when e.g. resizing the window
self._initial_viewdata = {
'adduser': {
'AddUser': {
'uid': '',
'cn': '',
'program': '',
'forwarding_address': '',
'num_terms': '1',
},
'transaction': {
'RenewUser': {
'uid': '',
'num_terms': '1',
},
'Transaction': {
'op_layout': None,
'msg_layout': None,
'labels': {},
'status': 'not started',
},
'Result': {},
}
self.viewdata = deepcopy(self._initial_viewdata)
# data which is shared between multiple views
self.for_member = True
self.is_club_rep = False
self.confirm_lines = None
self.operations = None
self.deferred_req = None
def reset_viewdata(self):
def reset(self):
self.viewdata = deepcopy(self._initial_viewdata)
self.is_club_rep = False
self.confirm_lines = None
self.operations = None
self.deferred_req = None
self.title = None
self.error_message = None
self.scene_stack.clear()

66
ceo/tui/ResultView.py Normal file
View File

@ -0,0 +1,66 @@
from threading import Thread
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Layout, Label, Button, Divider
import requests
from .CeoFrame import CeoFrame
class ResultView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'Result',
on_load=self._resultview_on_load, title='Result',
)
layout = Layout([1, 10])
self.add_layout(layout)
layout.add_widget(Label(''), 1)
self._status_label = Label('Sending request... ')
layout.add_widget(self._status_label, 1)
layout.add_widget(Label(''), 1)
self._summary_layout = Layout([1, 10], fill_frame=True)
self.add_layout(self._summary_layout)
self._add_buttons()
self.fix()
def _add_buttons(self):
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Divider())
layout = Layout([1, 1])
self.add_layout(layout)
self._next_btn = Button('Next', self._next)
self._next_btn.disabled = True
layout.add_widget(self._next_btn, 1)
def _show_msg(self, text: str = ''):
for line in text.splitlines():
self._summary_layout.add_widget(Label(line), 1)
# override this method in child classes if desired
def show_result(self, resp: requests.Response):
self._show_msg('The operation was successfully performed.')
def _resultview_on_load(self):
def target():
try:
resp = self._model.deferred_req()
self._status_label.text += 'Done.'
self._next_btn.disabled = False
if resp.status_code != 200:
self._show_msg('An error occurred:')
self._show_msg(resp.text.rstrip())
return
self.show_result(resp)
finally:
self.fix()
self.reset()
self._model.screen.force_update()
Thread(target=target).start()
def _next(self):
self._model.reset()
raise NextScene('Welcome')

View File

@ -15,12 +15,12 @@ class TransactionView(Frame):
height,
width,
can_scroll=False,
on_load=self._on_load,
on_load=self._txnview_on_load,
title='Running Transaction',
)
self._model = model
# map operation names to label widgets
self._labels = model.viewdata['transaction']['labels']
self._labels = model.viewdata['Transaction']['labels']
# this is an ugly hack to get around the fact that _on_load()
# will be called again when we reset() in enable_next_btn.
self._loaded = False
@ -35,7 +35,7 @@ class TransactionView(Frame):
self._next_btn = Button('Next', self._next)
# we don't want to disable the button if the txn completed
# and the user just resized the window
if self._model.viewdata['transaction']['status'] != 'completed':
if self._model.viewdata['Transaction']['status'] != 'completed':
self._next_btn.disabled = True
layout.add_widget(self._next_btn, 1)
@ -43,15 +43,15 @@ class TransactionView(Frame):
self._op_layout.add_widget(Label(''), 0)
self._op_layout.add_widget(Label(''), 2)
def _on_load(self):
def _txnview_on_load(self):
if self._loaded:
return
self._loaded = True
d = self._model.viewdata['transaction']
d = self._model.viewdata['Transaction']
first_time = True
if d['op_layout'] is None:
self._op_layout = Layout([10, 1, 10])
self._op_layout = Layout([12, 1, 10])
self.add_layout(self._op_layout)
# store the layouts so that we can re-use them when the screen
# gets resized
@ -88,7 +88,7 @@ class TransactionView(Frame):
Thread(target=self._do_txn).start()
def _do_txn(self):
self._model.viewdata['transaction']['status'] = 'in progress'
self._model.viewdata['Transaction']['status'] = 'in progress'
resp = self._model.deferred_req()
handler = TUIStreamResponseHandler(
model=self._model,
@ -104,9 +104,8 @@ class TransactionView(Frame):
# enabled it
self.reset()
# save the fact that the transaction is completed
self._model.viewdata['transaction']['status'] = 'completed'
self._model.viewdata['Transaction']['status'] = 'completed'
def _next(self):
self._model.reset_viewdata()
self._model.scene_stack.clear()
self._model.reset()
raise NextScene('Welcome')

View File

@ -47,7 +47,7 @@ class WelcomeView(Frame):
item_id = self.data['members']
desc, view = self._members_menu_items[item_id]
if desc.endswith('club rep'):
self._model.for_member = False
self._model.is_club_rep = True
self._model.title = desc
self._model.scene_stack.append('Welcome')
raise NextScene(view)

View File

@ -1,22 +1,19 @@
from threading import Thread
from asciimatics.exceptions import NextScene
from asciimatics.widgets import Frame, Layout, Text, Button, Divider, Label
from asciimatics.widgets import Layout, Text, Label
from ...term_utils import get_terms_for_new_user
from ...utils import http_get, http_post, defer, user_dict_kv, \
get_terms_for_new_user, get_adduser_operations
get_adduser_operations
from ..CeoFrame import CeoFrame
class AddUserView(Frame):
class AddUserView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen,
height,
width,
can_scroll=False,
on_load=self._on_load,
screen, height, width, model, 'AddUser',
save_data=True,
)
self._model = model
self._username_changed = False
layout = Layout([100], fill_frame=True)
@ -43,27 +40,11 @@ class AddUserView(Frame):
self._status_label = Label('')
layout.add_widget(self._status_label)
layout = Layout([100])
self.add_layout(layout)
layout.add_widget(Divider())
layout = Layout([1, 1])
self.add_layout(layout)
layout.add_widget(Button('Back', self._back), 0)
layout.add_widget(Button("Next", self._next), 1)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _on_load(self):
self.title = self._model.title
# restore the saved input fields' values
self.data = self._model.viewdata['adduser']
def _on_unload(self):
# save the input fields' values so that they don't disappear when
# the window gets resized
self.save()
self._model.viewdata['adduser'] = self.data
def _on_username_change(self):
self._username_changed = True
@ -75,10 +56,9 @@ class AddUserView(Frame):
if username == '':
return
Thread(target=self._get_uwldap_info, args=[username]).start()
#self._get_uwldap_info(username)
def _get_uwldap_info(self, username):
self._status_label.text = 'Searching for user...'
self._status_label.text = 'Looking up user...'
try:
resp = http_get('/api/uwldap/' + username)
if resp.status_code != 200:
@ -92,9 +72,6 @@ class AddUserView(Frame):
finally:
self._status_label.text = ''
def _back(self):
raise NextScene(self._model.scene_stack.pop())
def _next(self):
body = {
'uid': self._username.value,
@ -105,10 +82,10 @@ class AddUserView(Frame):
if self._forwarding_address.value:
body['forwarding_addresses'] = [self._forwarding_address.value]
new_terms = get_terms_for_new_user(int(self._num_terms.value))
if self._model.for_member:
body['terms'] = new_terms
else:
if self._model.is_club_rep:
body['non_member_terms'] = new_terms
else:
body['terms'] = new_terms
pairs = user_dict_kv(body)
self._model.confirm_lines = [
'The following user will be created:',
@ -120,6 +97,3 @@ class AddUserView(Frame):
self._model.deferred_req = defer(http_post, '/api/members', json=body)
self._model.operations = get_adduser_operations(body)
self._model.scene_stack.append('AddUser')
raise NextScene('Confirm')

View File

@ -0,0 +1,64 @@
from asciimatics.widgets import Layout, Text, Label
from ...term_utils import get_terms_for_renewal
from ...utils import http_post, defer
from ..CeoFrame import CeoFrame
class RenewUserView(CeoFrame):
def __init__(self, screen, width, height, model):
super().__init__(
screen, height, width, model, 'RenewUser',
save_data=True,
)
self._model = model
layout = Layout([100], fill_frame=True)
self.add_layout(layout)
self._username = Text("Username:", "uid")
layout.add_widget(self._username)
self._num_terms = Text(
"Number of terms:", "num_terms",
validator=lambda s: s.isdigit() and s[0] != '0')
layout.add_widget(self._num_terms)
layout = Layout([100])
self.add_layout(layout)
self._status_label = Label('')
layout.add_widget(self._status_label)
self.add_buttons(
back_btn=True,
next_scene='Confirm', on_next=self._next)
self.fix()
def _next(self):
uid = self._username.value
self._status_label.text = 'Looking up user...'
self._model.screen.force_update()
self._model.screen.draw_next_frame()
new_terms = get_terms_for_renewal(
uid,
int(self._num_terms.value),
self._model.is_club_rep,
self._model,
)
self._status_label.text = ''
body = {'uid': uid}
if self._model.is_club_rep:
body['non_member_terms'] = new_terms
terms_str = 'non-member terms'
else:
body['terms'] = new_terms
terms_str = 'member terms'
self._model.confirm_lines = [
'The following ' + terms_str + ' will be added:',
'',
','.join(new_terms),
'',
'Are you sure you want to continue?',
]
self._model.deferred_req = defer(
http_post, f'/api/members/{uid}/renew', json=body)

View File

@ -6,10 +6,13 @@ from asciimatics.scene import Scene
from asciimatics.screen import Screen
from .ConfirmView import ConfirmView
from .ErrorView import ErrorView
from .Model import Model
from .ResultView import ResultView
from .TransactionView import TransactionView
from .WelcomeView import WelcomeView
from .members.AddUserView import AddUserView
from .members.RenewUserView import RenewUserView
def unhandled(event):
@ -35,9 +38,12 @@ def screen_wrapper(screen, last_scene, model):
height = min(screen.height, 24)
views = [
('Welcome', WelcomeView(screen, width, height, model)),
('AddUser', AddUserView(screen, width, height, model)),
('Confirm', ConfirmView(screen, width, height, model)),
('Transaction', TransactionView(screen, width, height, model)),
('Result', ResultView(screen, width, height, model)),
('Error', ErrorView(screen, width, height, model)),
('AddUser', AddUserView(screen, width, height, model)),
('RenewUser', RenewUserView(screen, width, height, model)),
]
scenes = [
Scene([view], -1, name=name) for name, view in views

10
ceo/tui/utils.py Normal file
View File

@ -0,0 +1,10 @@
from asciimatics.exceptions import NextScene
import requests
def handle_sync_response(resp: requests.Response, model):
if resp.status_code != 200:
model.error_message = resp.text.rstrip()
raise NextScene('Error')
return resp.json()

View File

@ -1,6 +1,5 @@
import functools
import json
import sys
from typing import List, Dict, Tuple, Callable
import requests
@ -8,7 +7,6 @@ from zope import component
from .StreamResponseHandler import StreamResponseHandler
from ceo_common.interfaces import IHTTPClient, IConfig
from ceo_common.model import Term
from ceod.transactions.members import AddMemberTransaction
@ -85,12 +83,6 @@ def space_colon_kv(pairs: List[Tuple[str, str]]) -> List[str]:
return lines
def get_terms_for_new_user(num_terms: int) -> List[str]:
current_term = Term.current()
terms = [current_term + i for i in range(num_terms)]
return list(map(str, terms))
def user_dict_kv(d: Dict) -> List[Tuple[str]]:
"""Pretty-format a serialized User as (key, value) pairs."""
pairs = [