from zope.interface import implementer from zope import component from contextlib import contextmanager from ceo_common.interfaces import IDatabaseService, IConfig from ceo_common.errors import DatabaseConnectionError, DatabasePermissionError, UserAlreadyExistsError, \ UserNotFoundError from ceo_common.logger_factory import logger_factory from ceod.utils import gen_password from ceod.db.utils import response_is_empty from mysql.connector import connect from mysql.connector.errors import InterfaceError, ProgrammingError logger = logger_factory(__name__) @implementer(IDatabaseService) class MySQLService: type = 'mysql' def __init__(self): config = component.getUtility(IConfig) self.auth_username = config.get('mysql_username') self.auth_password = config.get('mysql_password') self.host = config.get('mysql_host') # check that database is up and that we have admin rights test_user = "test_user_64559" self.create_db(test_user) self.delete_db(test_user) @contextmanager def mysql_connection(self): try: with connect( host=self.host, user=self.auth_username, password=self.auth_password, ) as con: yield con except InterfaceError as e: logger.error(e) raise DatabaseConnectionError() except ProgrammingError as e: logger.error(e) raise DatabasePermissionError() def create_db(self, username: str) -> str: password = gen_password() search_for_user = f"SELECT user FROM mysql.user WHERE user='{username}'" search_for_db = f"SHOW DATABASES LIKE '{username}'" create_user = f""" CREATE USER '{username}'@'%' IDENTIFIED BY %(password)s; """ create_database = f""" CREATE DATABASE {username}; GRANT ALL PRIVILEGES ON {username}.* TO '{username}'@'%'; """ with self.mysql_connection() as con, con.cursor() as cursor: if response_is_empty(search_for_user, con): cursor.execute(create_user, {'password': password}) if response_is_empty(search_for_db, con): cursor.execute(create_database) else: raise UserAlreadyExistsError() return password def reset_db_passwd(self, username: str) -> str: password = gen_password() search_for_user = f"SELECT user FROM mysql.user WHERE user='{username}'" reset_password = f""" ALTER USER '{username}'@'%' IDENTIFIED BY %(password)s """ with self.mysql_connection() as con, con.cursor() as cursor: if not response_is_empty(search_for_user, con): cursor.execute(reset_password, {'password': password}) else: raise UserNotFoundError(username) return password def delete_db(self, username: str): drop_db = f"DROP DATABASE IF EXISTS {username}" drop_user = f""" DROP USER IF EXISTS '{username}'@'%'; """ with self.mysql_connection() as con, con.cursor() as cursor: cursor.execute(drop_db) cursor.execute(drop_user)