db-api (#10)
Implement DB endpoints Co-authored-by: Andrew Wang <someone.zip@gmail.com> Co-authored-by: Max Erenberg <merenber@csclub.uwaterloo.ca> Reviewed-on: #10 Co-authored-by: Andrew Wang <a268wang@localhost> Co-committed-by: Andrew Wang <a268wang@localhost>pull/13/head
parent
7d23fd690f
commit
eb5d632606
@ -0,0 +1,48 @@ |
||||
#!/bin/bash |
||||
|
||||
set -ex |
||||
|
||||
. .drone/common.sh |
||||
|
||||
# set FQDN in /etc/hosts |
||||
add_fqdn_to_hosts $(get_ip_addr $(hostname)) coffee |
||||
|
||||
export DEBIAN_FRONTEND=noninteractive |
||||
apt update |
||||
|
||||
apt install --no-install-recommends -y default-mysql-server postgresql |
||||
|
||||
service mysql stop |
||||
sed -E -i 's/^(bind-address[[:space:]]+= 127.0.0.1)$/#\1/' /etc/mysql/mariadb.conf.d/50-server.cnf |
||||
service mysql start |
||||
cat <<EOF | mysql |
||||
CREATE USER 'mysql' IDENTIFIED BY 'mysql'; |
||||
GRANT ALL PRIVILEGES ON *.* TO 'mysql' WITH GRANT OPTION; |
||||
EOF |
||||
|
||||
service postgresql stop |
||||
POSTGRES_DIR=/etc/postgresql/11/main |
||||
cat <<EOF > $POSTGRES_DIR/pg_hba.conf |
||||
# TYPE DATABASE USER ADDRESS METHOD |
||||
local all postgres peer |
||||
host all postgres 0.0.0.0/0 md5 |
||||
|
||||
local all all peer |
||||
host all all localhost md5 |
||||
|
||||
local sameuser all md5 |
||||
host sameuser all 0.0.0.0/0 md5 |
||||
EOF |
||||
grep -Eq "^listen_addresses = '*'$" $POSTGRES_DIR/postgresql.conf || \ |
||||
echo "listen_addresses = '*'" >> $POSTGRES_DIR/postgresql.conf |
||||
service postgresql start |
||||
su -c " |
||||
cat <<EOF | psql |
||||
ALTER USER postgres WITH PASSWORD 'postgres'; |
||||
REVOKE ALL ON SCHEMA public FROM public; |
||||
GRANT ALL ON SCHEMA public TO postgres; |
||||
EOF" postgres |
||||
|
||||
# sync with phosphoric-acid |
||||
apt install -y netcat-openbsd |
||||
nc -l 0.0.0.0 9000 |
@ -0,0 +1,17 @@ |
||||
# don't resolve container names to *real* CSC machines |
||||
sed -E '/^(domain|search)[[:space:]]+csclub.uwaterloo.ca/d' /etc/resolv.conf > /tmp/resolv.conf |
||||
cp /tmp/resolv.conf /etc/resolv.conf |
||||
rm /tmp/resolv.conf |
||||
|
||||
get_ip_addr() { |
||||
getent hosts $1 | cut -d' ' -f1 |
||||
} |
||||
|
||||
add_fqdn_to_hosts() { |
||||
ip_addr=$1 |
||||
hostname=$2 |
||||
sed -E "/${ip_addr}.*\\b${hostname}\\b/d" /etc/hosts > /tmp/hosts |
||||
cp /tmp/hosts /etc/hosts |
||||
rm /tmp/hosts |
||||
echo "$ip_addr $hostname.csclub.internal $hostname" >> /etc/hosts |
||||
} |
@ -0,0 +1,18 @@ |
||||
from zope.interface import Attribute, Interface |
||||
|
||||
|
||||
class IDatabaseService(Interface): |
||||
"""Interface to create databases for users.""" |
||||
|
||||
type = Attribute('the type of databases that will be created') |
||||
auth_username = Attribute('username to a privileged user on the database host') |
||||
auth_password = Attribute('password to a privileged user on the database host') |
||||
|
||||
def create_db(username: str) -> str: |
||||
"""create a user and database and return the password""" |
||||
|
||||
def reset_passwd(username: str) -> str: |
||||
"""reset user password and return it""" |
||||
|
||||
def delete_db(username: str): |
||||
"""remove user and delete their database""" |
@ -0,0 +1,104 @@ |
||||
from flask import Blueprint |
||||
from zope import component |
||||
from functools import wraps |
||||
|
||||
from ceod.api.utils import authz_restrict_to_syscom, user_is_in_group, \ |
||||
requires_authentication_no_realm, development_only |
||||
from ceo_common.errors import UserNotFoundError, DatabaseConnectionError, DatabasePermissionError, \ |
||||
InvalidUsernameError, UserAlreadyExistsError |
||||
from ceo_common.interfaces import ILDAPService, IDatabaseService |
||||
|
||||
|
||||
bp = Blueprint('db', __name__) |
||||
|
||||
|
||||
def db_exception_handler(func): |
||||
@wraps(func) |
||||
def function(db_type: str, username: str): |
||||
try: |
||||
# Username should not contain symbols. |
||||
# Underscores are allowed. |
||||
for c in username: |
||||
if not (c.isalnum() or c == '_'): |
||||
raise InvalidUsernameError() |
||||
ldap_srv = component.getUtility(ILDAPService) |
||||
ldap_srv.get_user(username) # make sure user exists |
||||
return func(db_type, username) |
||||
except UserNotFoundError: |
||||
return {'error': 'user not found'}, 404 |
||||
except UserAlreadyExistsError: |
||||
return {'error': 'database user is already created'}, 409 |
||||
except InvalidUsernameError: |
||||
return {'error': 'username contains invalid characters'}, 400 |
||||
except DatabaseConnectionError: |
||||
return {'error': 'unable to connect or authenticate to sql server'}, 500 |
||||
except DatabasePermissionError: |
||||
return {'error': 'unable to perform action due to permissions'}, 500 |
||||
return function |
||||
|
||||
|
||||
@db_exception_handler |
||||
def create_db_from_type(db_type: str, username: str): |
||||
db_srv = component.getUtility(IDatabaseService, db_type) |
||||
password = db_srv.create_db(username) |
||||
return {'password': password} |
||||
|
||||
|
||||
@db_exception_handler |
||||
def reset_db_passwd_from_type(db_type: str, username: str): |
||||
db_srv = component.getUtility(IDatabaseService, db_type) |
||||
password = db_srv.reset_db_passwd(username) |
||||
return {'password': password} |
||||
|
||||
|
||||
@db_exception_handler |
||||
def delete_db_from_type(db_type: str, username: str): |
||||
db_srv = component.getUtility(IDatabaseService, db_type) |
||||
db_srv.delete_db(username) |
||||
return {'status': 'OK'} |
||||
|
||||
|
||||
@bp.route('/mysql/<username>', methods=['POST']) |
||||
@requires_authentication_no_realm |
||||
def create_mysql_db(auth_user: str, username: str): |
||||
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')): |
||||
return {'error': "not authorized to create databases for others"}, 403 |
||||
return create_db_from_type('mysql', username) |
||||
|
||||
|
||||
@bp.route('/postgresql/<username>', methods=['POST']) |
||||
@requires_authentication_no_realm |
||||
def create_postgresql_db(auth_user: str, username: str): |
||||
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')): |
||||
return {'error': "not authorized to create databases for others"}, 403 |
||||
return create_db_from_type('postgresql', username) |
||||
|
||||
|
||||
@bp.route('/mysql/<username>/pwreset', methods=['POST']) |
||||
@requires_authentication_no_realm |
||||
def reset_mysql_db_passwd(auth_user: str, username: str): |
||||
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')): |
||||
return {'error': "not authorized to request password reset for others"}, 403 |
||||
return reset_db_passwd_from_type('mysql', username) |
||||
|
||||
|
||||
@bp.route('/postgresql/<username>/pwreset', methods=['POST']) |
||||
@requires_authentication_no_realm |
||||
def reset_postgresql_db_passwd(auth_user: str, username: str): |
||||
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')): |
||||
return {'error': "not authorized to request password reset for others"}, 403 |
||||
return reset_db_passwd_from_type('postgresql', username) |
||||
|
||||
|
||||
@bp.route('/mysql/<username>', methods=['DELETE']) |
||||
@authz_restrict_to_syscom |
||||
@development_only |
||||
def delete_mysql_db(username: str): |
||||
return delete_db_from_type('mysql', username) |
||||
|
||||
|
||||
@bp.route('/postgresql/<username>', methods=['DELETE']) |
||||
@authz_restrict_to_syscom |
||||
@development_only |
||||
def delete_postgresql_db(username: str): |
||||
return delete_db_from_type('postgresql', username) |
@ -0,0 +1,88 @@ |
||||
from zope.interface import implementer |
||||
from zope import component |
||||
from contextlib import contextmanager |
||||
|
||||
from ceo_common.interfaces import IDatabaseService, IConfig |
||||
from ceo_common.errors import DatabaseConnectionError, DatabasePermissionError, UserAlreadyExistsError, \ |
||||
UserNotFoundError |
||||
from ceo_common.logger_factory import logger_factory |
||||
from ceod.utils import gen_password |
||||
from ceod.db.utils import response_is_empty |
||||
|
||||
from mysql.connector import connect |
||||
from mysql.connector.errors import InterfaceError, ProgrammingError |
||||
|
||||
logger = logger_factory(__name__) |
||||
|
||||
|
||||
@implementer(IDatabaseService) |
||||
class MySQLService: |
||||
|
||||
type = 'mysql' |
||||
|
||||
def __init__(self): |
||||
config = component.getUtility(IConfig) |
||||
self.auth_username = config.get('mysql_username') |
||||
self.auth_password = config.get('mysql_password') |
||||
self.host = config.get('mysql_host') |
||||
|
||||
@contextmanager |
||||
def mysql_connection(self): |
||||
try: |
||||
with connect( |
||||
host=self.host, |
||||
user=self.auth_username, |
||||
password=self.auth_password, |
||||
) as con: |
||||
yield con |
||||
except InterfaceError as e: |
||||
logger.error(e) |
||||
raise DatabaseConnectionError() |
||||
except ProgrammingError as e: |
||||
logger.error(e) |
||||
raise DatabasePermissionError() |
||||
|
||||
def create_db(self, username: str) -> str: |
||||
password = gen_password() |
||||
search_for_user = f"SELECT user FROM mysql.user WHERE user='{username}'" |
||||
search_for_db = f"SHOW DATABASES LIKE '{username}'" |
||||
create_user = f""" |
||||
CREATE USER '{username}'@'%' IDENTIFIED BY %(password)s; |
||||
""" |
||||
create_database = f""" |
||||
CREATE DATABASE {username}; |
||||
GRANT ALL PRIVILEGES ON {username}.* TO '{username}'@'%'; |
||||
""" |
||||
|
||||
with self.mysql_connection() as con, con.cursor() as cursor: |
||||
if response_is_empty(search_for_user, con): |
||||
cursor.execute(create_user, {'password': password}) |
||||
if response_is_empty(search_for_db, con): |
||||
cursor.execute(create_database) |
||||
else: |
||||
raise UserAlreadyExistsError() |
||||
return password |
||||
|
||||
def reset_db_passwd(self, username: str) -> str: |
||||
password = gen_password() |
||||
search_for_user = f"SELECT user FROM mysql.user WHERE user='{username}'" |
||||
reset_password = f""" |
||||
ALTER USER '{username}'@'%' IDENTIFIED BY %(password)s |
||||
""" |
||||
|
||||
with self.mysql_connection() as con, con.cursor() as cursor: |
||||
if not response_is_empty(search_for_user, con): |
||||
cursor.execute(reset_password, {'password': password}) |
||||
else: |
||||
raise UserNotFoundError(username) |
||||
return password |
||||
|
||||
def delete_db(self, username: str): |
||||
drop_db = f"DROP DATABASE IF EXISTS {username}" |
||||
drop_user = f""" |
||||
DROP USER IF EXISTS '{username}'@'%'; |
||||
""" |
||||
|
||||
with self.mysql_connection() as con, con.cursor() as cursor: |
||||
cursor.execute(drop_db) |
||||
cursor.execute(drop_user) |
@ -0,0 +1,86 @@ |
||||
from zope.interface import implementer |
||||
from zope import component |
||||
from contextlib import contextmanager |
||||
|
||||
from ceo_common.interfaces import IDatabaseService, IConfig |
||||
from ceo_common.errors import DatabaseConnectionError, DatabasePermissionError, \ |
||||
UserAlreadyExistsError, UserNotFoundError |
||||
from ceo_common.logger_factory import logger_factory |
||||
from ceod.utils import gen_password |
||||
from ceod.db.utils import response_is_empty |
||||
|
||||
from psycopg2 import connect, OperationalError, ProgrammingError |
||||
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT |
||||
|
||||
logger = logger_factory(__name__) |
||||
|
||||
|
||||
@implementer(IDatabaseService) |
||||
class PostgreSQLService: |
||||
|
||||
type = 'postgresql' |
||||
|
||||
def __init__(self): |
||||
config = component.getUtility(IConfig) |
||||
self.auth_username = config.get('postgresql_username') |
||||
self.auth_password = config.get('postgresql_password') |
||||
self.host = config.get('postgresql_host') |
||||
|
||||
@contextmanager |
||||
def psql_connection(self): |
||||
con = None |
||||
try: |
||||
# Don't use the connection as a context manager, because that |
||||
# creates a new transaction. |
||||
con = connect( |
||||
host=self.host, |
||||
user=self.auth_username, |
||||
password=self.auth_password, |
||||
) |
||||
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) |
||||
yield con |
||||
except OperationalError as e: |
||||
logger.error(e) |
||||
raise DatabaseConnectionError() |
||||
except ProgrammingError as e: |
||||
logger.error(e) |
||||
raise DatabasePermissionError() |
||||
finally: |
||||
if con is not None: |
||||
con.close() |
||||
|
||||
def create_db(self, username: str) -> str: |
||||
password = gen_password() |
||||
search_for_user = f"SELECT FROM pg_roles WHERE rolname='{username}'" |
||||
search_for_db = f"SELECT FROM pg_database WHERE datname='{username}'" |
||||
create_user = f"CREATE USER {username} WITH PASSWORD %(password)s" |
||||
create_database = f"CREATE DATABASE {username} OWNER {username}" |
||||
revoke_perms = f"REVOKE ALL ON DATABASE {username} FROM PUBLIC" |
||||
|
||||
with self.psql_connection() as con, con.cursor() as cursor: |
||||
if not response_is_empty(search_for_user, con): |
||||
raise UserAlreadyExistsError() |
||||
cursor.execute(create_user, {'password': password}) |
||||
if response_is_empty(search_for_db, con): |
||||
cursor.execute(create_database) |
||||
cursor.execute(revoke_perms) |
||||
return password |
||||
|
||||
def reset_db_passwd(self, username: str) -> str: |
||||
password = gen_password() |
||||
search_for_user = f"SELECT FROM pg_roles WHERE rolname='{username}'" |
||||
reset_password = f"ALTER USER {username} WITH PASSWORD %(password)s" |
||||
|
||||
with self.psql_connection() as con, con.cursor() as cursor: |
||||
if response_is_empty(search_for_user, con): |
||||
raise UserNotFoundError(username) |
||||
cursor.execute(reset_password, {'password': password}) |
||||
return password |
||||
|
||||
def delete_db(self, username: str): |
||||
drop_db = f"DROP DATABASE IF EXISTS {username}" |
||||
drop_user = f"DROP USER IF EXISTS {username}" |
||||
|
||||
with self.psql_connection() as con, con.cursor() as cursor: |
||||
cursor.execute(drop_db) |
||||
cursor.execute(drop_user) |
@ -0,0 +1,2 @@ |
||||
from .MySQLService import MySQLService |
||||
from .PostgreSQLService import PostgreSQLService |
@ -0,0 +1,5 @@ |
||||
def response_is_empty(query: str, connection) -> bool: |
||||
with connection.cursor() as cursor: |
||||
cursor.execute(query) |
||||
response = cursor.fetchall() |
||||
return len(response) == 0 |
@ -0,0 +1,120 @@ |
||||
import pytest |
||||
|
||||
from ceod.model import User |
||||
from mysql.connector import connect |
||||
from mysql.connector.errors import ProgrammingError |
||||
|
||||
|
||||
def test_api_create_mysql_db(cfg, client, g_admin_ctx, ldap_user, krb_user): |
||||
uid = ldap_user.uid |
||||
|
||||
with g_admin_ctx(): |
||||
user = User(uid='someone_else', cn='Some Name', terms=['s2021']) |
||||
user.add_to_ldap() |
||||
|
||||
# user should be able to create db for themselves |
||||
status, data = client.post(f"/api/db/mysql/{uid}", json={}, principal=uid) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
passwd = data['password'] |
||||
|
||||
# conflict if attempting to create db when already has one |
||||
status, data = client.post(f"/api/db/mysql/{uid}", json={}, principal=uid) |
||||
assert status == 409 |
||||
|
||||
# normal user cannot create db for others |
||||
status, data = client.post("/api/db/mysql/someone_else", json={}, principal=uid) |
||||
assert status == 403 |
||||
|
||||
# cannot create db for user not in ldap |
||||
status, data = client.post("/api/db/mysql/user_not_found", json={}) |
||||
assert status == 404 |
||||
|
||||
# cannot create db when username contains symbols |
||||
status, data = client.post("/api/db/mysql/!invalid", json={}) |
||||
assert status == 400 |
||||
|
||||
with connect( |
||||
host=cfg.get('mysql_host'), |
||||
user=uid, |
||||
password=passwd, |
||||
) as con, con.cursor() as cur: |
||||
cur.execute("SHOW DATABASES") |
||||
response = cur.fetchall() |
||||
assert len(response) == 2 |
||||
|
||||
with pytest.raises(ProgrammingError): |
||||
cur.execute("CREATE DATABASE new_db") |
||||
|
||||
status, data = client.delete(f"/api/db/mysql/{uid}", json={}) |
||||
assert status == 200 |
||||
|
||||
# user should be deleted |
||||
with pytest.raises(ProgrammingError): |
||||
con = connect( |
||||
host=cfg.get('mysql_host'), |
||||
user=uid, |
||||
password=passwd, |
||||
) |
||||
|
||||
# db should be deleted |
||||
with connect( |
||||
host=cfg.get('mysql_host'), |
||||
user=cfg.get('mysql_username'), |
||||
password=cfg.get('mysql_password'), |
||||
) as con, con.cursor() as cur: |
||||
cur.execute(f"SHOW DATABASES LIKE '{uid}'") |
||||
response = cur.fetchall() |
||||
assert len(response) == 0 |
||||
|
||||
with g_admin_ctx(): |
||||
user.remove_from_ldap() |
||||
|
||||
|
||||
def test_api_passwd_reset_mysql(cfg, client, g_admin_ctx, ldap_user, krb_user): |
||||
uid = ldap_user.uid |
||||
|
||||
with g_admin_ctx(): |
||||
user = User(uid='someone_else', cn='Some Name', terms=['s2021']) |
||||
user.add_to_ldap() |
||||
|
||||
status, data = client.post(f"/api/db/mysql/{uid}", json={}) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
old_passwd = data['password'] |
||||
|
||||
con = connect( |
||||
host=cfg.get('mysql_host'), |
||||
user=uid, |
||||
password=old_passwd, |
||||
) |
||||
con.close() |
||||
|
||||
# normal user can get a password reset for themselves |
||||
status, data = client.post(f"/api/db/mysql/{uid}/pwreset", json={}, principal=uid) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
new_passwd = data['password'] |
||||
|
||||
assert old_passwd != new_passwd |
||||
|
||||
# normal user cannot reset password for others |
||||
status, data = client.post("/api/db/mysql/someone_else/pwreset", json={}, principal=uid) |
||||
assert status == 403 |
||||
|
||||
# cannot password reset a user that does not have a database |
||||
status, data = client.post("/api/db/mysql/someone_else/pwreset", json={}) |
||||
assert status == 404 |
||||
|
||||
con = connect( |
||||
host=cfg.get('mysql_host'), |
||||
user=uid, |
||||
password=new_passwd, |
||||
) |
||||
con.close() |
||||
|
||||
status, data = client.delete(f"/api/db/mysql/{uid}", json={}) |
||||
assert status == 200 |
||||
|
||||
with g_admin_ctx(): |
||||
user.remove_from_ldap() |
@ -0,0 +1,123 @@ |
||||
import pytest |
||||
|
||||
from ceod.model import User |
||||
from psycopg2 import connect, OperationalError, ProgrammingError |
||||
|
||||
|
||||
def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_user, krb_user): |
||||
uid = ldap_user.uid |
||||
|
||||
with g_admin_ctx(): |
||||
user = User(uid='someone_else', cn='Some Name', terms=['s2021']) |
||||
user.add_to_ldap() |
||||
|
||||
# user should be able to create db for themselves |
||||
status, data = client.post(f"/api/db/postgresql/{uid}", json={}, principal=uid) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
passwd = data['password'] |
||||
|
||||
# conflict if attempting to create db when already has one |
||||
status, data = client.post(f"/api/db/postgresql/{uid}", json={}, principal=uid) |
||||
assert status == 409 |
||||
|
||||
# normal user cannot create db for others |
||||
status, data = client.post("/api/db/postgresql/someone_else", json={}, principal=uid) |
||||
assert status == 403 |
||||
|
||||
# cannot create db for user not in ldap |
||||
status, data = client.post("/api/db/postgresql/user_not_found", json={}) |
||||
assert status == 404 |
||||
|
||||
# cannot create db when username contains symbols |
||||
status, data = client.post("/api/db/postgresql/!invalid", json={}) |
||||
assert status == 400 |
||||
|
||||
con = connect( |
||||
host=cfg.get('postgresql_host'), |
||||
user=uid, |
||||
password=passwd, |
||||
) |
||||
con.autocommit = True |
||||
with con.cursor() as cur: |
||||
cur.execute("SELECT datname FROM pg_database") |
||||
response = cur.fetchall() |
||||
# 3 of the 4 are postgres, template0, template1 |
||||
assert len(response) == 4 |
||||
with pytest.raises(ProgrammingError): |
||||
cur.execute("CREATE DATABASE new_db") |
||||
con.close() |
||||
|
||||
status, data = client.delete(f"/api/db/postgresql/{uid}", json={}) |
||||
assert status == 200 |
||||
|
||||
# user should be deleted |
||||
with pytest.raises(OperationalError): |
||||
con = connect( |
||||
host=cfg.get('postgresql_host'), |
||||
user=uid, |
||||
password=passwd, |
||||
) |
||||
|
||||
# db should be deleted |
||||
with connect( |
||||
host=cfg.get('postgresql_host'), |
||||
user=cfg.get('postgresql_username'), |
||||
password=cfg.get('postgresql_password'), |
||||
) as con, con.cursor() as cur: |
||||
cur.execute(f"SELECT datname FROM pg_database WHERE datname = '{uid}'") |
||||
response = cur.fetchall() |
||||
assert len(response) == 0 |
||||
|
||||
with g_admin_ctx(): |
||||
user.remove_from_ldap() |
||||
|
||||
|
||||
def test_api_passwd_reset_psql(cfg, client, g_admin_ctx, ldap_user, krb_user): |
||||
uid = ldap_user.uid |
||||
|
||||
with g_admin_ctx(): |
||||
user = User(uid='someone_else', cn='Some Name', terms=['s2021']) |
||||
user.add_to_ldap() |
||||
|
||||
status, data = client.post(f"/api/db/postgresql/{uid}", json={}) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
old_passwd = data['password'] |
||||
|
||||
con = connect( |
||||
host=cfg.get('postgresql_host'), |
||||
user=uid, |
||||
password=old_passwd, |
||||
) |
||||
con.close() |
||||
|
||||
# normal user can get a password reset for themselves |
||||
status, data = client.post(f"/api/db/postgresql/{uid}/pwreset", json={}, principal=uid) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
new_passwd = data['password'] |
||||
|
||||
assert old_passwd != new_passwd |
||||
|
||||
# normal user cannot reset password for others |
||||
status, data = client.post("/api/db/postgresql/someone_else/pwreset", |
||||
json={}, principal=uid) |
||||
assert status == 403 |
||||
|
||||
# cannot password reset a user that does not have a database |
||||
status, data = client.post("/api/db/postgresql/someone_else/pwreset", json={}) |
||||
assert status == 404 |
||||
|
||||
con = connect( |
||||
host=cfg.get('postgresql_host'), |
||||
user=uid, |
||||
password=new_passwd, |
||||
) |
||||
con.close() |
||||
|
||||
status, data = client.delete(f"/api/db/postgresql/{uid}", json={}) |
||||
assert status == 200 |
||||
|
||||
with g_admin_ctx(): |
||||
user.remove_from_ldap() |
Loading…
Reference in new issue