pyceo/ceod/db/MySQLService.py

90 lines
3.2 KiB
Python

from zope.interface import implementer
from zope import component
from contextlib import contextmanager
from ceo_common.interfaces import IDatabaseService, IConfig
from ceo_common.errors import DatabaseConnectionError, DatabasePermissionError, UserAlreadyExistsError, \
UserNotFoundError
from ceod.utils import gen_password
from ceod.db.utils import response_is_empty
from mysql.connector import connect
from mysql.connector.errors import InterfaceError, ProgrammingError
@implementer(IDatabaseService)
class MySQLService:
type = 'mysql'
def __init__(self):
config = component.getUtility(IConfig)
self.auth_username = config.get('mysql_username')
self.auth_password = config.get('mysql_password')
@contextmanager
def mysql_connection(self):
try:
with connect(
host='localhost',
user=self.auth_username,
password=self.auth_password,
) as con:
yield con
except InterfaceError:
raise DatabaseConnectionError()
except ProgrammingError:
raise DatabasePermissionError()
def create_db(self, username: str) -> str:
password = gen_password()
search_for_user = f"SELECT user FROM mysql.user WHERE user='{username}'"
search_for_db = f"SHOW DATABASES LIKE '{username}'"
create_user = f"""
CREATE USER '{username}'@'localhost' IDENTIFIED BY %(password)s;
CREATE USER '{username}'@'%' IDENTIFIED BY %(password)s;
"""
create_database = f"""
CREATE DATABASE {username};
GRANT ALL PRIVILEGES ON {username}.* TO '{username}'@'localhost';
GRANT ALL PRIVILEGES ON {username}.* TO '{username}'@'%';
"""
with self.mysql_connection() as con:
with con.cursor() as cursor:
if response_is_empty(search_for_user, con):
cursor.execute(create_user, {'password': password})
if response_is_empty(search_for_db, con):
cursor.execute(create_database)
else:
raise UserAlreadyExistsError()
return password
def reset_db_passwd(self, username: str) -> str:
password = gen_password()
search_for_user = f"SELECT user FROM mysql.user WHERE user='{username}'"
reset_password = f"""
ALTER USER '{username}'@'localhost' IDENTIFIED BY %(password)s
ALTER USER '{username}'@'%' IDENTIFIED BY %(password)s
"""
with self.mysql_connection() as con:
with con.cursor() as cursor:
if not response_is_empty(search_for_user, con):
cursor.execute(reset_password, {'password': password})
else:
raise UserNotFoundError(username)
return password
def delete_db(self, username: str):
drop_db = f"DROP DATABASE IF EXISTS {username}"
drop_user = f"""
DROP USER IF EXISTS '{username}'@'localhost';
DROP USER IF EXISTS '{username}'@'%';
"""
with self.mysql_connection() as con:
with con.cursor() as cursor:
cursor.execute(drop_db)
cursor.execute(drop_user)