fixes to logic

pull/10/head
Andrew Wang 1 year ago
parent 10f7aab3ed
commit af87d6a7e6
  1. 3
      README.md
  2. 8
      ceo_common/errors.py
  3. 10
      ceo_common/interfaces/IDatabaseService.py
  4. 43
      ceod/api/database.py
  5. 46
      ceod/db/MySQLService.py
  6. 54
      ceod/db/PostgreSQLService.py
  7. 5
      ceod/db/utils.py

@ -1,9 +1,6 @@
### TODO before merge
- testing and tests
- need someone to test isolation of PostgreSQL users
- handling for improper (missing values, unable to connect) config
- make MySQLService and PostgreSQLService look nicer to read
- check database.py
# pyceo
[![Build Status](https://ci.csclub.uwaterloo.ca/api/badges/public/pyceo/status.svg?ref=refs/heads/v1)](https://ci.csclub.uwaterloo.ca/public/pyceo)

@ -52,4 +52,10 @@ class NoSuchListError(Exception):
class DatabaseConnectionError(Exception):
pass
def __init__(self):
super().__init__('unable to connect or authenticate to sql service')
class DatabasePermissionError(Exception):
def __init__(self):
super().__init__('unable to perform action due to lack of permissions')

@ -5,10 +5,10 @@ from .IUser import IUser
class IDatabaseService(Interface):
"""Interface to create databases for users."""
type = Attribute('the type of database')
host = Attribute('the database address')
auth_username = Attribute('username of user creating connection')
auth_password = Attribute('password of user creating connection')
type = Attribute('the type of databases that will be created')
host = Attribute('the database host')
auth_username = Attribute('username to a privileged user on the database host')
auth_password = Attribute('password to a privileged user on the database host')
def create_db(username: str) -> str:
"""create a database for user and return its password"""
"""try to create a database and user and return its password"""

@ -3,30 +3,38 @@ from zope import component
from ceod.api.utils import authz_restrict_to_staff, authz_restrict_to_syscom, \
user_is_in_group, requires_authentication_no_realm, \
create_streaming_response, development_only
from ceo_common.errors import UserNotFoundError, DatabaseConnectionError
from ceo_common.interfaces import IDatabaseService
from ceo_common.errors import UserNotFoundError, DatabaseConnectionError, DatabasePermissionError
from ceo_common.interfaces import ILDAPService, IDatabaseService
bp = Blueprint('db', __name__)
# could combine create_mysql_db and create_postgresql_db into one function
# catch other less expected errors (mysql or psql error)
# handle if user somehow dropped their database
@bp.route('/mysql/<username>', methods=['POST'])
@requires_authentication_no_realm
def create_mysql_db(auth_user: str, username: str):
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')):
return {'error': "not authorized to create databases for others"}, 403
def create_db_from_type(db_type: str, username: str):
try:
db_srv = component.getUtility(IDatabaseService, 'mysql')
if not username.isalnum(): # username should not contain symbols
raise UserNotFoundError
ldap_srv = component.getUtility(ILDAPService)
ldap_srv.get_user(username) # make sure user exists
db_srv = component.getUtility(IDatabaseService, db_type)
password = db_srv.create_db(username)
return {'password': password}
except UserNotFoundError:
return {'error': 'user not found'}, 404
except DatabaseConnectionError:
return {'error': 'unable to connect to mysql server'}, 400
return {'error': 'unable to connect or authenticate to sql server'}, 400
except DatabasePermissionError:
return {'error': 'unable to perform action due to permissions'}, 502
except:
return {'error': 'Unexpected error'}, 500
@bp.route('/mysql/<username>', methods=['POST'])
@requires_authentication_no_realm
def create_mysql_db(auth_user: str, username: str):
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')):
return {'error': "not authorized to create databases for others"}, 403
return create_db_from_type('mysql', username)
@bp.route('/postgresql/<username>', methods=['POST'])
@ -34,11 +42,4 @@ def create_mysql_db(auth_user: str, username: str):
def create_postgresql_db(auth_user: str, username: str):
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')):
return {'error': "not authorized to create databases for others"}, 403
try:
db_srv = component.getUtility(IDatabaseService, 'postgresql')
password = db_srv.create_db(username)
return {'password': password}
except UserNotFoundError:
return {'error': 'user not found'}, 404
except DatabaseConnectionError:
return {'error': 'unable to connect to postgresql server'}, 400
return create_db_from_type('postgresql', username)

@ -1,9 +1,11 @@
from zope.interface import implementer
from zope import component
from ceo_common.interfaces import IDatabaseService, ILDAPService, IConfig
from ceo_common.errors import DatabaseConnectionError
from mysql.connector import connect, Error
import ceod.utils as utils
from ceo_common.interfaces import IDatabaseService, IConfig
from ceo_common.errors import DatabaseConnectionError, DatabasePermissionError
from ceod.utils import gen_password
from ceod.db.utils import response_is_empty
from mysql.connector import connect
from mysql.connector.errors import InterfaceError, ProgrammingError
@implementer(IDatabaseService)
@ -16,31 +18,31 @@ class MySQLService:
self.auth_password = config.get('mysql_password')
def create_db(self, username: str) -> str:
component.getUtility(ILDAPService).get_user(username) # make sure user exists
password = utils.gen_password()
user = {'username': username, 'password': password}
password = gen_password()
search_for_user = f"SELECT user FROM mysql.user WHERE user='{username}'"
search_for_db = f"SHOW DATABASES LIKE '{username}'"
create_user = f"CREATE USER '{username}'@'localhost' IDENTIFIED BY %(password)s"
reset_password = f"ALTER USER '{username}' IDENTIFIED BY %(password)s"
create_database = f"""
CREATE DATABASE {username};
GRANT ALL PRIVILEGES ON {username}.* TO '{username}'@'localhost';
"""
try:
with connect(
host=self.host,
user=self.auth_username,
password=self.auth_password,
) as con:
search_for_user = "SELECT user FROM mysql.user WHERE user='%(username)s'"
create_user = "CREATE USER '%(username)s'@'localhost' IDENTIFIED BY '%(password)s'"
create_database = "CREATE DATABASE %(username)s"
set_user_perms = "GRANT ALL PRIVILEGES ON %(username)s.* TO '%(username)s'@'localhost'"
flush_privileges = "FLUSH PRIVILEGES"
reset_password = "ALTER USER '%(username)s' IDENTIFIED BY '%(password)s'"
with con.cursor() as cursor:
cursor.execute(search_for_user, user)
response = cursor.fetchall()
if len(response) == 0:
cursor.execute(create_user, user)
cursor.execute(create_database, user)
cursor.execute(set_user_perms, user)
cursor.execute(flush_privileges)
if response_is_empty(search_for_user, con):
cursor.execute(create_user, {password: password})
else:
cursor.execute(reset_password, user)
cursor.execute(reset_password, {password: password})
if response_is_empty(search_for_db, con):
cursor.execute(create_database)
con.commit()
return password
except Error:
except InterfaceError:
raise DatabaseConnectionError()
except ProgrammingError:
raise DatabasePermissionError()

@ -1,9 +1,11 @@
from zope.interface import implementer
from zope import component
from ceo_common.interfaces import IDatabaseService, ILDAPService, IConfig
from ceo_common.errors import DatabaseConnectionError
import ceod.utils as utils
from psycopg2 import connect, Error
from ceo_common.interfaces import IDatabaseService, IConfig
from ceo_common.errors import DatabaseConnectionError, DatabasePermissionError
from ceod.utils import gen_password
from ceod.db.utils import response_is_empty
from psycopg2 import connect, OperationalError, ProgrammingError
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
@implementer(IDatabaseService)
@ -16,32 +18,32 @@ class PostgreSQLService:
self.auth_password = config.get('postgresql_password')
def create_db(self, username: str) -> str:
component.getUtility(ILDAPService).get_user(username) # make sure user exists
password = utils.gen_password()
user = {'username': username, 'password': password}
password = gen_password()
search_for_user = f"SELECT FROM pg_roles WHERE rolname='{username}'"
search_for_db = f"SELECT FROM pg_database WHERE datname='{username}'"
create_user = f"CREATE USER {username} WITH PASSWORD %(password)s"
reset_password = f"ALTER USER {username} WITH PASSWORD %(password)s)"
create_database = f"""
CREATE DATABASE {username} OWNER {username};
REVOKE ALL ON DATABASE {username} FROM PUBLIC;
"""
try:
with connect(
host=self.host,
user=self.auth_username,
password=self.auth_password,
host=self.host,
user=self.auth_username,
password=self.auth_password,
) as con:
# only the owner (and superusers) can ever drop a database
search_for_user = "SELECT FROM pg_roles WHERE rolname='%(username)s'"
create_user = "CREATE USER %(username)s WITH NOSUPERUSER NOCREATEDB NOCREATEROLE PASSWORD '%(password)s'"
create_database = "CREATE DATABASE %(username)s"
set_db_perms = "REVOKE ALL ON DATABASE %(username)s FROM PUBLIC"
set_user_perms = "GRANT ALL ON DATABASE %(username)s TO %(username)s"
reset_password = "ALTER USER '%(username)s' WITH PASSWORD '%(password)s'"
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with con.cursor() as cursor:
cursor.execute(search_for_user, user)
response = cursor.fetchall()
if len(response) == 0:
cursor.execute(create_user, user)
cursor.execute(create_database, user)
cursor.execute(set_db_perms, user)
cursor.execute(set_user_perms, user)
if response_is_empty(search_for_user, con):
cursor.execute(create_user, {password: password})
else:
cursor.execute(reset_password, user)
cursor.execute(reset_password, {password: password})
if response_is_empty(search_for_db, con):
cursor.execute(create_database)
return password
except Error:
except OperationalError:
raise DatabaseConnectionError()
except ProgrammingError:
raise DatabasePermissionError()

@ -0,0 +1,5 @@
def response_is_empty(query: str, connection) -> bool:
with connection.cursor() as cursor:
cursor.execute(query)
response = cursor.fetchall()
return len(response) == 0
Loading…
Cancel
Save