pyceo/ceo/cli/utils.py

69 lines
1.8 KiB
Python

import socket
import os
from typing import List, Tuple, Dict
import click
import requests
from ..utils import space_colon_kv, generic_handle_stream_response
from .CLIStreamResponseHandler import CLIStreamResponseHandler
class Abort(click.ClickException):
"""Abort silently."""
def __init__(self, exit_code=1):
super().__init__('')
self.exit_code = exit_code
def show(self):
pass
def print_lines(lines: List[str]):
"""Print multiple lines to stdout."""
for line in lines:
click.echo(line)
def print_colon_kv(pairs: List[Tuple[str, str]]):
"""
Pretty-print a list of key-value pairs.
"""
for line in space_colon_kv(pairs):
click.echo(line)
def handle_stream_response(resp: requests.Response, operations: List[str]) -> List[Dict]:
handler = CLIStreamResponseHandler(operations)
return generic_handle_stream_response(resp, operations, handler)
def handle_sync_response(resp: requests.Response):
"""
Exit the program if the request was not successful.
Returns the parsed JSON response.
"""
if resp.status_code != 200:
click.echo('An error occurred:')
click.echo(resp.text.rstrip())
raise Abort()
return resp.json()
def check_file_path(file):
if os.path.isfile(file):
click.echo(f"{file} will be overwritten")
click.confirm('Do you want to continue?', abort=True)
elif os.path.isdir(file):
click.echo(f"Error: there exists a directory at {file}")
raise Abort()
def check_if_in_development() -> bool:
"""Aborts if we are not currently in the dev environment."""
if not socket.getfqdn().endswith('.csclub.internal'):
click.echo('This command may only be called during development.')
raise Abort()