pyceo/ceod/db/PostgreSQLService.py

62 lines
2.8 KiB
Python

from zope.interface import implementer
from zope import component
from ceo_common.interfaces import IDatabaseService, ILDAPService, IConfig
from ceo_common.errors import DatabaseConnectionError
import ceod.utils as utils
from psycopg2 import connect, Error
@implementer(IDatabaseService)
class PostgreSQLService:
def __init__(self):
self.type = 'postgresql'
config = component.getUtility(IConfig)
self.host = config.get('postgresql_host')
self.auth_username = config.get('postgresql_username')
self.auth_password = config.get('postgresql_password')
# https://www.postgresql.org/docs/9.1/auth-pg-hba-conf.html
# pg_hba.conf only listen to localhost and only allow users to login to database with the same name as user
# local sameuser all localhost md5
# need different line for syscom
# Allow only postgres to create on the schema public
# REVOKE ALL ON SCHEMA public FROM PUBLIC;
# GRANT ALL ON SCHEMA public TO postgres;
# by default all database created are open to connection from anyone
# only the owner (and superusers) can ever drop a database
# note that pg_catalog allows access list of database and user names for everyone and cannot be disabled with breaking some things
def create_db(self, username: str) -> str:
component.getUtility(ILDAPService).get_user(username) # make sure user exists
password = utils.gen_password()
user = {'username': username, 'password': password}
try:
with connect(
host=self.host,
user=self.auth_username,
password=self.auth_password,
) as con:
# limit access to localhost?
search_for_user = "SELECT FROM pg_roles WHERE rolname='%(username)s'"
create_user = "CREATE USER %(username)s WITH NOSUPERUSER NOCREATEDB NOCREATEROLE PASSWORD '%(password)s'"
create_database = "CREATE DATABASE %(username)s"
set_db_perms = "REVOKE ALL ON DATABASE %(username)s FROM PUBLIC"
set_user_perms = "GRANT ALL ON DATABASE %(username)s TO %(username)s"
reset_password = "ALTER USER '%(username)s' WITH PASSWORD '%(password)s'"
with con.cursor() as cursor:
cursor.execute(search_for_user, user)
response = cursor.fetchall()
if len(response) == 0:
cursor.execute(create_user, user)
cursor.execute(create_database, user)
cursor.execute(set_db_perms, user)
cursor.execute(set_user_perms, user)
else:
cursor.execute(reset_password, user)
return password
except Error:
raise DatabaseConnectionError()