60 lines
1.7 KiB

from typing import List, Dict
import requests
from zope import component
from ceo_common.interfaces import IHTTPClient, IConfig
def http_request(method: str, path: str, **kwargs) -> requests.Response:
client = component.getUtility(IHTTPClient)
cfg = component.getUtility(IConfig)
if path.startswith('/api/db'):
host = cfg.get('ceod_db_host')
need_cred = False
host = cfg.get('ceod_admin_host')
# The forwarded TGT is only needed for endpoints which write to LDAP
need_cred = method != 'GET'
return client.request(
host, path, method, principal=None, need_cred=need_cred,
stream=True, **kwargs)
def http_get(path: str, **kwargs) -> requests.Response:
return http_request('GET', path, **kwargs)
def http_post(path: str, **kwargs) -> requests.Response:
return http_request('POST', path, **kwargs)
def http_patch(path: str, **kwargs) -> requests.Response:
return http_request('PATCH', path, **kwargs)
def http_delete(path: str, **kwargs) -> requests.Response:
return http_request('DELETE', path, **kwargs)
def get_failed_operations(data: List[Dict]) -> List[str]:
Get a list of the failed operations using the JSON objects
streamed from the server.
prefix = 'failed_to_'
failed = []
for d in data:
if 'operation' not in d:
operation = d['operation']
if not operation.startswith(prefix):
operation = operation[len(prefix):]
if ':' in operation:
# sometimes the operation looks like
# "failed_to_do_something: error message"
operation = operation[:operation.index(':')]
return failed