import pytest from ceod.model import User from psycopg2 import connect, OperationalError, ProgrammingError def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_user, krb_user): uid = ldap_user.uid with g_admin_ctx(): user = User(uid='someone_else', cn='Some Name', given_name='Some', sn='Name', terms=['s2021']) user.add_to_ldap() # user should be able to create db for themselves status, data = client.post(f"/api/db/postgresql/{uid}", json={}, principal=uid) assert status == 200 assert 'password' in data passwd = data['password'] # conflict if attempting to create db when already has one status, data = client.post(f"/api/db/postgresql/{uid}", json={}, principal=uid) assert status == 409 # normal user cannot create db for others status, data = client.post("/api/db/postgresql/someone_else", json={}, principal=uid) assert status == 403 # cannot create db for user not in ldap status, data = client.post("/api/db/postgresql/user_not_found", json={}) assert status == 404 # cannot create db when username contains symbols status, data = client.post("/api/db/postgresql/!invalid", json={}) assert status == 400 con = connect( host=cfg.get('postgresql_host'), user=uid, password=passwd, ) con.autocommit = True with con.cursor() as cur: cur.execute("SELECT datname FROM pg_database") response = cur.fetchall() # 3 of the 4 are postgres, template0, template1 assert len(response) == 4 with pytest.raises(ProgrammingError): cur.execute("CREATE DATABASE new_db") con.close() status, data = client.delete(f"/api/db/postgresql/{uid}", json={}) assert status == 200 # user should be deleted with pytest.raises(OperationalError): con = connect( host=cfg.get('postgresql_host'), user=uid, password=passwd, ) # db should be deleted with connect( host=cfg.get('postgresql_host'), user=cfg.get('postgresql_username'), password=cfg.get('postgresql_password'), ) as con, con.cursor() as cur: cur.execute(f"SELECT datname FROM pg_database WHERE datname = '{uid}'") response = cur.fetchall() assert len(response) == 0 with g_admin_ctx(): user.remove_from_ldap() def test_api_passwd_reset_psql(cfg, client, g_admin_ctx, ldap_user, krb_user): uid = ldap_user.uid with g_admin_ctx(): user = User(uid='someone_else', cn='Some Name', given_name='Some', sn='Name', terms=['s2021']) user.add_to_ldap() status, data = client.post(f"/api/db/postgresql/{uid}", json={}) assert status == 200 assert 'password' in data old_passwd = data['password'] con = connect( host=cfg.get('postgresql_host'), user=uid, password=old_passwd, ) con.close() # normal user can get a password reset for themselves status, data = client.post(f"/api/db/postgresql/{uid}/pwreset", json={}, principal=uid) assert status == 200 assert 'password' in data new_passwd = data['password'] assert old_passwd != new_passwd # normal user cannot reset password for others status, data = client.post("/api/db/postgresql/someone_else/pwreset", json={}, principal=uid) assert status == 403 # cannot password reset a user that does not have a database status, data = client.post("/api/db/postgresql/someone_else/pwreset", json={}) assert status == 404 con = connect( host=cfg.get('postgresql_host'), user=uid, password=new_passwd, ) con.close() status, data = client.delete(f"/api/db/postgresql/{uid}", json={}) assert status == 200 with g_admin_ctx(): user.remove_from_ldap()