pyceo/ceod/api/database.py

101 lines
3.8 KiB
Python
Raw Normal View History

2021-08-27 23:01:35 -04:00
from flask import Blueprint
2021-08-21 00:34:10 -04:00
from zope import component
2021-08-26 15:42:18 -04:00
from functools import wraps
2021-08-27 23:01:35 -04:00
from ceod.api.utils import authz_restrict_to_syscom, user_is_in_group, requires_authentication_no_realm, \
development_only
2021-08-26 15:42:18 -04:00
from ceo_common.errors import UserNotFoundError, DatabaseConnectionError, DatabasePermissionError, \
InvalidUsernameError, UserAlreadyExistsError
2021-08-24 02:14:28 -04:00
from ceo_common.interfaces import ILDAPService, IDatabaseService
2021-08-21 00:34:10 -04:00
bp = Blueprint('db', __name__)
2021-08-26 15:42:18 -04:00
def db_exception_handler(func):
@wraps(func)
def function(db_type: str, username: str):
try:
if not username.isalnum(): # username should not contain symbols
raise InvalidUsernameError()
ldap_srv = component.getUtility(ILDAPService)
ldap_srv.get_user(username) # make sure user exists
return func(db_type, username)
except UserNotFoundError:
return {'error': 'user not found'}, 404
except UserAlreadyExistsError:
return {'error': 'database user is already created'}, 409
except InvalidUsernameError:
return {'error': 'username contains invalid characters'}, 400
except DatabaseConnectionError:
return {'error': 'unable to connect or authenticate to sql server'}, 500
except DatabasePermissionError:
return {'error': 'unable to perform action due to permissions'}, 500
return function
@db_exception_handler
2021-08-24 02:14:28 -04:00
def create_db_from_type(db_type: str, username: str):
2021-08-26 15:42:18 -04:00
db_srv = component.getUtility(IDatabaseService, db_type)
password = db_srv.create_db(username)
return {'password': password}
2021-08-24 02:14:28 -04:00
2021-08-26 15:42:18 -04:00
@db_exception_handler
def reset_db_passwd_from_type(db_type: str, username: str):
db_srv = component.getUtility(IDatabaseService, db_type)
password = db_srv.reset_passwd(username)
return {'password': password}
2021-08-24 02:14:28 -04:00
2021-08-26 15:42:18 -04:00
@db_exception_handler
2021-08-24 22:31:50 -04:00
def delete_db_from_type(db_type: str, username: str):
2021-08-26 15:42:18 -04:00
db_srv = component.getUtility(IDatabaseService, db_type)
db_srv.delete_db(username)
2021-08-24 22:31:50 -04:00
2021-08-24 02:14:28 -04:00
@bp.route('/mysql/<username>', methods=['POST'])
@requires_authentication_no_realm
def create_mysql_db(auth_user: str, username: str):
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')):
return {'error': "not authorized to create databases for others"}, 403
return create_db_from_type('mysql', username)
2021-08-21 00:34:10 -04:00
@bp.route('/postgresql/<username>', methods=['POST'])
@requires_authentication_no_realm
def create_postgresql_db(auth_user: str, username: str):
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')):
return {'error': "not authorized to create databases for others"}, 403
2021-08-24 02:14:28 -04:00
return create_db_from_type('postgresql', username)
2021-08-24 22:31:50 -04:00
2021-08-26 15:42:18 -04:00
@bp.route('/mysql/<username>/pwreset', methods=['POST'])
@requires_authentication_no_realm
def reset_mysql_db_passwd(auth_user: str, username: str):
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')):
return {'error': "not authorized to request password reset for others"}, 403
return reset_db_passwd_from_type('mysql', username)
@bp.route('/postgresql/<username>/pwreset', methods=['POST'])
@requires_authentication_no_realm
def reset_postgresql_db_passwd(auth_user: str, username: str):
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')):
return {'error': "not authorized to request password reset for others"}, 403
return reset_db_passwd_from_type('postgresql', username)
2021-08-25 21:49:51 -04:00
@bp.route('/mysql/<username>', methods=['DELETE'])
@authz_restrict_to_syscom
@development_only
def delete_mysql_db(username: str):
delete_db_from_type('mysql', username)
2021-08-24 22:31:50 -04:00
@bp.route('/postgresql/<username>', methods=['DELETE'])
@authz_restrict_to_syscom
@development_only
def delete_postgresql_db(username: str):
delete_db_from_type('postgresl', username)