forked from public/pyceo
59 lines
1.7 KiB
Python
59 lines
1.7 KiB
Python
from typing import List, Dict
|
|
|
|
import requests
|
|
from zope import component
|
|
|
|
from ceo_common.interfaces import IHTTPClient, IConfig
|
|
|
|
|
|
def http_request(method: str, path: str, **kwargs) -> requests.Response:
|
|
client = component.getUtility(IHTTPClient)
|
|
cfg = component.getUtility(IConfig)
|
|
if path.startswith('/api/db'):
|
|
host = cfg.get('ceod_database_host')
|
|
delegate = False
|
|
else:
|
|
host = cfg.get('ceod_admin_host')
|
|
# The forwarded TGT is only needed for endpoints which write to LDAP
|
|
delegate = method != 'GET'
|
|
return client.request(
|
|
host, path, method, delegate=delegate, stream=True, **kwargs)
|
|
|
|
|
|
def http_get(path: str, **kwargs) -> requests.Response:
|
|
return http_request('GET', path, **kwargs)
|
|
|
|
|
|
def http_post(path: str, **kwargs) -> requests.Response:
|
|
return http_request('POST', path, **kwargs)
|
|
|
|
|
|
def http_patch(path: str, **kwargs) -> requests.Response:
|
|
return http_request('PATCH', path, **kwargs)
|
|
|
|
|
|
def http_delete(path: str, **kwargs) -> requests.Response:
|
|
return http_request('DELETE', path, **kwargs)
|
|
|
|
|
|
def get_failed_operations(data: List[Dict]) -> List[str]:
|
|
"""
|
|
Get a list of the failed operations using the JSON objects
|
|
streamed from the server.
|
|
"""
|
|
prefix = 'failed_to_'
|
|
failed = []
|
|
for d in data:
|
|
if 'operation' not in d:
|
|
continue
|
|
operation = d['operation']
|
|
if not operation.startswith(prefix):
|
|
continue
|
|
operation = operation[len(prefix):]
|
|
if ':' in operation:
|
|
# sometimes the operation looks like
|
|
# "failed_to_do_something: error message"
|
|
operation = operation[:operation.index(':')]
|
|
failed.append(operation)
|
|
return failed
|