pyceo/ceod/db/PostgreSQLService.py

68 lines
2.7 KiB
Python

from zope.interface import implementer
from zope import component
from ceo_common.interfaces import IDatabaseService, IConfig
from ceo_common.errors import DatabaseConnectionError, DatabasePermissionError
from ceod.utils import gen_password
from ceod.db.utils import response_is_empty
from psycopg2 import connect, OperationalError, ProgrammingError
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
@implementer(IDatabaseService)
class PostgreSQLService:
def __init__(self):
self.type = 'postgresql'
config = component.getUtility(IConfig)
self.auth_username = config.get('postgresql_username')
self.auth_password = config.get('postgresql_password')
def create_db(self, username: str) -> str:
password = gen_password()
search_for_user = f"SELECT FROM pg_roles WHERE rolname='{username}'"
search_for_db = f"SELECT FROM pg_database WHERE datname='{username}'"
create_user = f"CREATE USER {username} WITH PASSWORD %(password)s"
reset_password = f"ALTER USER {username} WITH PASSWORD %(password)s"
create_database = f"""
CREATE DATABASE {username} OWNER {username};
REVOKE ALL ON DATABASE {username} FROM PUBLIC;
"""
try:
with connect(
host='localhost',
user=self.auth_username,
password=self.auth_password,
) as con:
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with con.cursor() as cursor:
if response_is_empty(search_for_user, con):
cursor.execute(create_user, {'password': password})
else:
cursor.execute(reset_password, {'password': password})
if response_is_empty(search_for_db, con):
cursor.execute(create_database)
return password
except OperationalError:
raise DatabaseConnectionError()
except ProgrammingError:
raise DatabasePermissionError()
def delete_db(self, username: str):
drop_user = f"DROP USER IF EXISTS {username}"
drop_db = f"DROP DATABASE IF EXISTS {username}"
try:
with connect(
host='localhost',
user=self.auth_username,
password=self.auth_password,
) as con:
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with con.cursor() as cursor:
cursor.execute(drop_user)
cursor.execute(drop_db)
except OperationalError:
raise DatabaseConnectionError()
except ProgrammingError:
raise DatabasePermissionError()