pyceo/ceo/utils.py

196 lines
6.0 KiB
Python

import functools
import json
from typing import List, Dict, Tuple, Callable
import requests
from zope import component
from .StreamResponseHandler import StreamResponseHandler
from ceo_common.interfaces import IHTTPClient, IConfig
from ceod.transactions.members import AddMemberTransaction
def http_request(method: str, path: str, **kwargs) -> requests.Response:
client = component.getUtility(IHTTPClient)
cfg = component.getUtility(IConfig)
if path.startswith('/api/db'):
host = cfg.get('ceod_database_host')
else:
host = cfg.get('ceod_admin_host')
return client.request(
method, host, path, stream=True, **kwargs)
def http_get(path: str, **kwargs) -> requests.Response:
return http_request('GET', path, **kwargs)
def http_post(path: str, **kwargs) -> requests.Response:
return http_request('POST', path, **kwargs)
def http_patch(path: str, **kwargs) -> requests.Response:
return http_request('PATCH', path, **kwargs)
def http_delete(path: str, **kwargs) -> requests.Response:
return http_request('DELETE', path, **kwargs)
def get_failed_operations(data: List[Dict]) -> List[str]:
"""
Get a list of the failed operations using the JSON objects
streamed from the server.
"""
prefix = 'failed_to_'
failed = []
for d in data:
if 'operation' not in d:
continue
operation = d['operation']
if not operation.startswith(prefix):
continue
operation = operation[len(prefix):]
if ':' in operation:
# sometimes the operation looks like
# "failed_to_do_something: error message"
operation = operation[:operation.index(':')]
failed.append(operation)
return failed
def space_colon_kv(pairs: List[Tuple[str, str]]) -> List[str]:
"""
Pretty-format the lines so that the keys and values
are aligned into columns.
Example:
key1: val1
key2: val2
key1000: val3
val4
"""
if not pairs:
return []
lines = []
maxlen = max(len(key) for key, val in pairs)
for key, val in pairs:
if key != '':
prefix = key + ': '
else:
# assume this is a continuation from the previous line
prefix = ' '
extra_space = ' ' * (maxlen - len(key))
line = prefix + extra_space + str(val)
lines.append(line)
return lines
def user_dict_kv(d: Dict) -> List[Tuple[str]]:
"""Pretty-format a serialized User as (key, value) pairs."""
pairs = [
('uid', d['uid']),
('cn', d['cn']),
('program', d.get('program', 'Unknown')),
]
if 'uid_number' in d:
pairs.append(('UID number', d['uid_number']))
if 'gid_number' in d:
pairs.append(('GID number', d['gid_number']))
if 'login_shell' in d:
pairs.append(('login shell', d['login_shell']))
if 'home_directory' in d:
pairs.append(('home directory', d['home_directory']))
if 'is_club' in d:
pairs.append(('is a club', str(d['is_club'])))
if 'forwarding_addresses' in d:
if len(d['forwarding_addresses']) > 0:
pairs.append(('forwarding addresses', d['forwarding_addresses'][0]))
for address in d['forwarding_addresses'][1:]:
pairs.append(('', address))
else:
pairs.append(('forwarding addresses', ''))
if 'terms' in d:
pairs.append(('member terms', ','.join(d['terms'])))
if 'non_member_terms' in d:
pairs.append(('non-member terms', ','.join(d['non_member_terms'])))
if 'password' in d:
pairs.append(('password', d['password']))
return pairs
def user_dict_lines(d: Dict) -> List[str]:
"""Pretty-format a serialized User."""
return space_colon_kv(user_dict_kv(d))
def get_adduser_operations(body: Dict):
operations = AddMemberTransaction.operations.copy()
if not body.get('forwarding_addresses'):
# don't bother displaying this because it won't be run
operations.remove('set_forwarding_addresses')
return operations
def generic_handle_stream_response(
resp: requests.Response,
operations: List[str],
handler: StreamResponseHandler,
) -> List[Dict]:
"""
Print output to the console while operations are being streamed
from the server over HTTP.
Returns the parsed JSON data streamed from the server.
"""
if resp.status_code != 200:
handler.handle_non_200(resp)
return
handler.begin()
idx = 0
data = []
for line in resp.iter_lines(decode_unicode=True, chunk_size=1):
d = json.loads(line)
data.append(d)
if d['status'] == 'aborted':
handler.handle_aborted(d['error'])
return
elif d['status'] == 'completed':
while idx < len(operations):
handler.handle_skipped_operation()
idx += 1
handler.handle_completed()
return data
operation = d['operation']
oper_failed = False
err_msg = None
prefix = 'failed_to_'
if operation.startswith(prefix):
operation = operation[len(prefix):]
oper_failed = True
# sometimes the operation looks like
# "failed_to_do_something: error message"
if ':' in operation:
operation, err_msg = operation.split(': ', 1)
while idx < len(operations) and operations[idx] != operation:
handler.handle_skipped_operation()
idx += 1
if idx == len(operations):
handler.handle_unrecognized_operation(operation)
continue
if oper_failed:
handler.handle_failed_operation(err_msg)
else:
handler.handle_successful_operation()
idx += 1
raise Exception('server response ended abruptly')
def defer(f: Callable, *args, **kwargs):
"""Defer a function's execution."""
@functools.wraps(f)
def wrapper():
return f(*args, **kwargs)
return wrapper