from typing import List, Dict import requests from zope import component from ceo_common.interfaces import IHTTPClient, IConfig def http_request(method: str, path: str, **kwargs) -> requests.Response: client = component.getUtility(IHTTPClient) cfg = component.getUtility(IConfig) if path.startswith('/api/db'): host = cfg.get('ceod_database_host') delegate = False else: host = cfg.get('ceod_admin_host') # The forwarded TGT is only needed for endpoints which write to LDAP delegate = method != 'GET' return client.request( host, path, method, delegate=delegate, stream=True, **kwargs) def http_get(path: str, **kwargs) -> requests.Response: return http_request('GET', path, **kwargs) def http_post(path: str, **kwargs) -> requests.Response: return http_request('POST', path, **kwargs) def http_patch(path: str, **kwargs) -> requests.Response: return http_request('PATCH', path, **kwargs) def http_delete(path: str, **kwargs) -> requests.Response: return http_request('DELETE', path, **kwargs) def get_failed_operations(data: List[Dict]) -> List[str]: """ Get a list of the failed operations using the JSON objects streamed from the server. """ prefix = 'failed_to_' failed = [] for d in data: if 'operation' not in d: continue operation = d['operation'] if not operation.startswith(prefix): continue operation = operation[len(prefix):] if ':' in operation: # sometimes the operation looks like # "failed_to_do_something: error message" operation = operation[:operation.index(':')] failed.append(operation) return failed