pyceo/ceo/cli/CLIStreamResponseHandler.py

71 lines
2.0 KiB
Python

import sys
from typing import List, Union
import click
import requests
from ..StreamResponseHandler import StreamResponseHandler
from ..operation_strings import descriptions as op_desc
class Abort(click.ClickException):
"""Abort silently."""
def __init__(self, exit_code=1):
super().__init__('')
self.exit_code = exit_code
def show(self):
pass
class CLIStreamResponseHandler(StreamResponseHandler):
def __init__(self, operations: List[str]):
super().__init__()
self.operations = operations
self.idx = 0
def handle_non_200(self, resp: requests.Response):
click.echo('An error occurred:')
click.echo(resp.text.rstrip())
raise Abort()
def begin(self):
click.echo(op_desc[self.operations[0]] + '... ', nl=False)
def handle_aborted(self, err_msg: str):
click.echo(click.style('ABORTED', fg='red'))
click.echo('The transaction was rolled back.')
click.echo('The error was: ' + err_msg)
click.echo('Please check the ceod logs.')
sys.exit(1)
def handle_completed(self):
click.echo('Transaction successfully completed.')
def _go_to_next_op(self):
"""
Increment the operation index and print the next operation, if
there is one.
"""
self.idx += 1
if self.idx < len(self.operations):
click.echo(op_desc[self.operations[self.idx]] + '... ', nl=False)
def handle_successful_operation(self):
click.echo(click.style('Done', fg='green'))
self._go_to_next_op()
def handle_failed_operation(self, err_msg: Union[str, None]):
click.echo(click.style('Failed', fg='red'))
if err_msg is not None:
click.echo(' Error message: ' + err_msg)
self._go_to_next_op()
def handle_skipped_operation(self):
click.echo('Skipped')
self._go_to_next_op()
def handle_unrecognized_operation(self, operation: str):
click.echo('Unrecognized operation: ' + operation)