postgres/mysql service updates

updated to use sql safe format strings
This commit is contained in:
Sean Zhang 2023-05-22 18:35:12 -04:00
parent 0c125de1d7
commit 0985ed5d3a
3 changed files with 67 additions and 63 deletions

View File

@ -46,58 +46,49 @@ class MySQLService:
def create_db(self, username: str) -> str:
password = gen_password()
search_for_user = "SELECT user FROM mysql.user WHERE user='%(username)s'"
search_for_db = "SHOW DATABASES LIKE '%(username)s'"
search_for_user = f"SELECT user FROM mysql.user WHERE user='{username}'"
search_for_db = f"SHOW DATABASES LIKE '{username}'"
# CREATE USER can't be used in a query with multiple statements
create_local_user_cmd = "CREATE USER '%(username)s'@'localhost' IDENTIFIED VIA unix_socket"
create_user_cmd = "CREATE USER '%(username)s'@'%' IDENTIFIED BY %(password)s"
create_database = """
CREATE DATABASE %(username)s;
GRANT ALL PRIVILEGES ON %(username)s.* TO '%(username)s'@'localhost' IDENTIFIED VIA unix_socket;
GRANT ALL PRIVILEGES ON %(username)s.* TO '%(username)s'@'%';
create_local_user_cmd = f"CREATE USER '{username}'@'localhost' IDENTIFIED VIA unix_socket"
create_user_cmd = f"CREATE USER '{username}'@'%' IDENTIFIED BY %(password)s"
create_database = f"""
CREATE DATABASE {username};
GRANT ALL PRIVILEGES ON {username}.* TO '{username}'@'localhost' IDENTIFIED VIA unix_socket;
GRANT ALL PRIVILEGES ON {username}.* TO '{username}'@'%';
"""
args_dict = {
'username': username
}
with self.mysql_connection() as con, con.cursor() as cursor:
if not response_is_empty(search_for_user, args_dict, con):
if not response_is_empty(search_for_user, con):
raise UserAlreadyExistsError()
cursor.execute(create_local_user_cmd, args_dict)
cursor.execute(create_user_cmd, {
'password': password, 'username': username
})
if response_is_empty(search_for_db, args_dict, con):
cursor.execute(create_database)
cursor.execute(create_local_user_cmd)
cursor.execute(create_user_cmd, {'password': password})
if response_is_empty(search_for_db, con):
cursor.execute(create_database, multi=True)
return password
def reset_db_passwd(self, username: str) -> str:
password = gen_password()
search_for_user = "SELECT user FROM mysql.user WHERE user='%(username)s'"
reset_password = """
ALTER USER '%(username)s'@'%' IDENTIFIED BY %(password)s;
search_for_user = f"SELECT user FROM mysql.user WHERE user='{username}'"
reset_password = f"""
ALTER USER '{username}'@'%' IDENTIFIED BY %(password)s;
"""
args_dict = {
'username': username
}
with self.mysql_connection() as con, con.cursor() as cursor:
if response_is_empty(search_for_user, args_dict, con):
if response_is_empty(search_for_user, con):
raise UserNotFoundError(username)
cursor.execute(reset_password,
{'password': password, 'username':username})
cursor.execute(reset_password, {'password': password})
return password
def delete_db(self, username: str):
drop_db = "DROP DATABASE IF EXISTS %(username)s"
drop_user = """
DROP USER IF EXISTS '%(username)s'@'localhost';
DROP USER IF EXISTS '%(username)s'@'%';
"""
drop_db = f"DROP DATABASE IF EXISTS {username}"
drop_user = f"DROP USER IF EXISTS %(username)s@'localhost'"
drop_user_2 = f"DROP USER IF EXISTS %(username)s@'%'"
args_dict = {
'username': username
}
with self.mysql_connection() as con, con.cursor() as cursor:
cursor.execute(drop_db, args_dict)
cursor.execute(drop_db)
cursor.execute(drop_user, args_dict)
cursor.execute(drop_user_2, args_dict)

View File

@ -9,7 +9,7 @@ from ceo_common.logger_factory import logger_factory
from ceod.utils import gen_password
from ceod.db.utils import response_is_empty
from psycopg2 import connect, OperationalError, ProgrammingError
from psycopg2 import connect, OperationalError, ProgrammingError, sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
logger = logger_factory(__name__)
@ -53,47 +53,54 @@ class PostgreSQLService:
def create_db(self, username: str) -> str:
password = gen_password()
search_for_user = "SELECT FROM pg_roles WHERE rolname='%(username)s'"
search_for_db = "SELECT FROM pg_database WHERE datname='%(username)s'"
create_user = "CREATE USER %(username)s WITH PASSWORD %(password)s"
create_database = "CREATE DATABASE %(username)s OWNER %(username)s"
revoke_perms = "REVOKE ALL ON DATABASE %(username)s FROM PUBLIC"
args_dict = {
'username': username
}
search_for_user = sql.SQL(
"SELECT FROM pg_roles WHERE rolname={username}"
).format(username=sql.Literal(username))
search_for_db = sql.SQL(
"SELECT FROM pg_database WHERE datname={username}"
).format(username=sql.Literal(username))
create_user = sql.SQL(
"CREATE USER {username} WITH PASSWORD %(password)s"
).format(username=sql.Identifier(username))
create_database = sql.SQL(
"CREATE DATABASE {username} OWNER {username}"
).format(username=sql.Identifier(username))
revoke_perms = sql.SQL(
"REVOKE ALL ON DATABASE {username} FROM PUBLIC"
).format(username=sql.Identifier(username))
with self.psql_connection() as con, con.cursor() as cursor:
if not response_is_empty(search_for_user, con):
raise UserAlreadyExistsError()
cursor.execute(create_user, {'password': password,
'username': username})
if response_is_empty(search_for_db, args_dict, con):
cursor.execute(create_database, args_dict)
cursor.execute(revoke_perms, args_dict)
cursor.execute(create_user, {'password': password})
if response_is_empty(search_for_db, con):
cursor.execute(create_database)
cursor.execute(revoke_perms)
return password
def reset_db_passwd(self, username: str) -> str:
password = gen_password()
search_for_user = "SELECT FROM pg_roles WHERE rolname='%(username)s'"
reset_password = "ALTER USER %(username)s WITH PASSWORD %(password)s"
args_dict = {
'username': username
}
search_for_user = sql.SQL(
"SELECT FROM pg_roles WHERE rolname={username}"
).format(username=sql.Literal(username))
reset_password = sql.SQL(
"ALTER USER {username} WITH PASSWORD %(password)s"
).format(username=sql.Identifier(username))
with self.psql_connection() as con, con.cursor() as cursor:
if response_is_empty(search_for_user, args_dict, con):
if response_is_empty(search_for_user, con):
raise UserNotFoundError(username)
cursor.execute(reset_password, {'password': password,
'username': username})
cursor.execute(reset_password, {'password': password})
return password
def delete_db(self, username: str):
drop_db = "DROP DATABASE IF EXISTS %(username)s"
drop_user = "DROP USER IF EXISTS %(username)s"
args_dict = {
'username': username
}
drop_db = sql.SQL(
"DROP DATABASE IF EXISTS {username}"
).format(username=sql.Identifier(username))
drop_user = sql.SQL(
"DROP USER IF EXISTS {username}"
).format(username=sql.Identifier(username))
with self.psql_connection() as con, con.cursor() as cursor:
cursor.execute(drop_db, args_dict)
cursor.execute(drop_user, args_dict)
cursor.execute(drop_db)
cursor.execute(drop_user)

View File

@ -1,4 +1,10 @@
def response_is_empty(query: str, args_dict, connection) -> bool:
def response_is_empty(query: str, connection) -> bool:
with connection.cursor() as cursor:
cursor.execute(query)
response = cursor.fetchall()
return len(response) == 0
def mysql_response_is_empty(query: str, args_dict, connection) -> bool:
with connection.cursor() as cursor:
cursor.execute(query, args_dict)
response = cursor.fetchall()