import functools import json import sys from typing import List, Dict, Tuple, Callable import requests from zope import component from .StreamResponseHandler import StreamResponseHandler from ceo_common.interfaces import IHTTPClient, IConfig from ceo_common.model import Term from ceod.transactions.members import AddMemberTransaction def http_request(method: str, path: str, **kwargs) -> requests.Response: client = component.getUtility(IHTTPClient) cfg = component.getUtility(IConfig) if path.startswith('/api/db'): host = cfg.get('ceod_database_host') else: host = cfg.get('ceod_admin_host') return client.request( method, host, path, stream=True, **kwargs) def http_get(path: str, **kwargs) -> requests.Response: return http_request('GET', path, **kwargs) def http_post(path: str, **kwargs) -> requests.Response: return http_request('POST', path, **kwargs) def http_patch(path: str, **kwargs) -> requests.Response: return http_request('PATCH', path, **kwargs) def http_delete(path: str, **kwargs) -> requests.Response: return http_request('DELETE', path, **kwargs) def get_failed_operations(data: List[Dict]) -> List[str]: """ Get a list of the failed operations using the JSON objects streamed from the server. """ prefix = 'failed_to_' failed = [] for d in data: if 'operation' not in d: continue operation = d['operation'] if not operation.startswith(prefix): continue operation = operation[len(prefix):] if ':' in operation: # sometimes the operation looks like # "failed_to_do_something: error message" operation = operation[:operation.index(':')] failed.append(operation) return failed def space_colon_kv(pairs: List[Tuple[str, str]]) -> List[str]: """ Pretty-format the lines so that the keys and values are aligned into columns. Example: key1: val1 key2: val2 key1000: val3 val4 """ lines = [] maxlen = max(len(key) for key, val in pairs) for key, val in pairs: if key != '': prefix = key + ': ' else: # assume this is a continuation from the previous line prefix = ' ' extra_space = ' ' * (maxlen - len(key)) line = prefix + extra_space + str(val) lines.append(line) return lines def get_terms_for_new_user(num_terms: int) -> List[str]: current_term = Term.current() terms = [current_term + i for i in range(num_terms)] return list(map(str, terms)) def user_dict_kv(d: Dict) -> List[Tuple[str]]: """Pretty-format a serialized User as (key, value) pairs.""" pairs = [ ('uid', d['uid']), ('cn', d['cn']), ('program', d.get('program', 'Unknown')), ] if 'uid_number' in d: pairs.append(('UID number', d['uid_number'])) if 'gid_number' in d: pairs.append(('GID number', d['gid_number'])) if 'login_shell' in d: pairs.append(('login shell', d['login_shell'])) if 'home_directory' in d: pairs.append(('home directory', d['home_directory'])) if 'is_club' in d: pairs.append(('is a club', str(d['is_club']))) if 'forwarding_addresses' in d: if len(d['forwarding_addresses']) > 0: pairs.append(('forwarding addresses', d['forwarding_addresses'][0])) for address in d['forwarding_addresses'][1:]: pairs.append(('', address)) else: pairs.append(('forwarding addresses', '')) if 'terms' in d: pairs.append(('member terms', ','.join(d['terms']))) if 'non_member_terms' in d: pairs.append(('non-member terms', ','.join(d['non_member_terms']))) if 'password' in d: pairs.append(('password', d['password'])) return pairs def user_dict_lines(d: Dict) -> List[str]: """Pretty-format a serialized User.""" return space_colon_kv(user_dict_kv(d)) def get_adduser_operations(body: Dict): operations = AddMemberTransaction.operations.copy() if not body.get('forwarding_addresses'): # don't bother displaying this because it won't be run operations.remove('set_forwarding_addresses') return operations def generic_handle_stream_response( resp: requests.Response, operations: List[str], handler: StreamResponseHandler, ) -> List[Dict]: """ Print output to the console while operations are being streamed from the server over HTTP. Returns the parsed JSON data streamed from the server. """ if resp.status_code != 200: handler.handle_non_200(resp) handler.begin() idx = 0 data = [] for line in resp.iter_lines(decode_unicode=True, chunk_size=8): d = json.loads(line) data.append(d) if d['status'] == 'aborted': handler.handle_aborted(d['error']) sys.exit(1) elif d['status'] == 'completed': while idx < len(operations): handler.handle_skipped_operation() idx += 1 handler.handle_completed() return data operation = d['operation'] oper_failed = False err_msg = None prefix = 'failed_to_' if operation.startswith(prefix): operation = operation[len(prefix):] oper_failed = True # sometimes the operation looks like # "failed_to_do_something: error message" if ':' in operation: operation, err_msg = operation.split(': ', 1) while idx < len(operations) and operations[idx] != operation: handler.handle_skipped_operation() idx += 1 if idx == len(operations): handler.handle_unrecognized_operation(operation) continue if oper_failed: handler.handle_failed_operation(err_msg) else: handler.handle_successful_operation() idx += 1 raise Exception('server response ended abruptly') def defer(f: Callable, *args, **kwargs): """Defer a function's execution.""" @functools.wraps(f) def wrapper(): return f(*args, **kwargs) return wrapper