pyceo/ceo/cli/utils.py

58 lines
1.5 KiB
Python
Raw Permalink Normal View History

2021-08-23 19:01:24 -04:00
from typing import List, Tuple, Dict
import click
import requests
from ceo_common.utils import is_in_development
2021-08-28 23:09:02 -04:00
from ..utils import space_colon_kv, generic_handle_stream_response
from .CLIStreamResponseHandler import CLIStreamResponseHandler
2021-08-23 19:01:24 -04:00
class Abort(click.ClickException):
"""Abort silently."""
def __init__(self, exit_code=1):
super().__init__('')
self.exit_code = exit_code
def show(self):
pass
2021-08-28 23:09:02 -04:00
def print_lines(lines: List[str]):
"""Print multiple lines to stdout."""
for line in lines:
click.echo(line)
2021-08-23 19:01:24 -04:00
def print_colon_kv(pairs: List[Tuple[str, str]]):
"""
2021-08-28 23:09:02 -04:00
Pretty-print a list of key-value pairs.
2021-08-23 19:01:24 -04:00
"""
2021-08-28 23:09:02 -04:00
for line in space_colon_kv(pairs):
click.echo(line)
2021-08-23 19:01:24 -04:00
def handle_stream_response(resp: requests.Response, operations: List[str]) -> List[Dict]:
2021-08-28 23:09:02 -04:00
handler = CLIStreamResponseHandler(operations)
return generic_handle_stream_response(resp, operations, handler)
2021-08-23 19:01:24 -04:00
def handle_sync_response(resp: requests.Response):
"""
Exit the program if the request was not successful.
Returns the parsed JSON response.
"""
if resp.status_code != 200:
click.echo('An error occurred:')
click.echo(resp.text.rstrip())
raise Abort()
return resp.json()
2021-08-24 01:48:55 -04:00
def check_if_in_development() -> bool:
"""Aborts if we are not currently in the dev environment."""
if not is_in_development():
2021-08-24 01:48:55 -04:00
click.echo('This command may only be called during development.')
raise Abort()