124 lines
3.8 KiB
Python
124 lines
3.8 KiB
Python
|
import pytest
|
||
|
|
||
|
from ceod.model import User
|
||
|
from psycopg2 import connect, OperationalError, ProgrammingError
|
||
|
|
||
|
|
||
|
def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_user, krb_user):
|
||
|
uid = ldap_user.uid
|
||
|
|
||
|
with g_admin_ctx():
|
||
|
user = User(uid='someone_else', cn='Some Name', terms=['s2021'])
|
||
|
user.add_to_ldap()
|
||
|
|
||
|
# user should be able to create db for themselves
|
||
|
status, data = client.post(f"/api/db/postgresql/{uid}", json={}, principal=uid)
|
||
|
assert status == 200
|
||
|
assert 'password' in data
|
||
|
passwd = data['password']
|
||
|
|
||
|
# conflict if attempting to create db when already has one
|
||
|
status, data = client.post(f"/api/db/postgresql/{uid}", json={}, principal=uid)
|
||
|
assert status == 409
|
||
|
|
||
|
# normal user cannot create db for others
|
||
|
status, data = client.post("/api/db/postgresql/someone_else", json={}, principal=uid)
|
||
|
assert status == 403
|
||
|
|
||
|
# cannot create db for user not in ldap
|
||
|
status, data = client.post("/api/db/postgresql/user_not_found", json={})
|
||
|
assert status == 404
|
||
|
|
||
|
# cannot create db when username contains symbols
|
||
|
status, data = client.post("/api/db/postgresql/!invalid", json={})
|
||
|
assert status == 400
|
||
|
|
||
|
con = connect(
|
||
|
host=cfg.get('postgresql_host'),
|
||
|
user=uid,
|
||
|
password=passwd,
|
||
|
)
|
||
|
con.autocommit = True
|
||
|
with con.cursor() as cur:
|
||
|
cur.execute("SELECT datname FROM pg_database")
|
||
|
response = cur.fetchall()
|
||
|
# 3 of the 4 are postgres, template0, template1
|
||
|
assert len(response) == 4
|
||
|
with pytest.raises(ProgrammingError):
|
||
|
cur.execute("CREATE DATABASE new_db")
|
||
|
con.close()
|
||
|
|
||
|
status, data = client.delete(f"/api/db/postgresql/{uid}", json={})
|
||
|
assert status == 200
|
||
|
|
||
|
# user should be deleted
|
||
|
with pytest.raises(OperationalError):
|
||
|
con = connect(
|
||
|
host=cfg.get('postgresql_host'),
|
||
|
user=uid,
|
||
|
password=passwd,
|
||
|
)
|
||
|
|
||
|
# db should be deleted
|
||
|
with connect(
|
||
|
host=cfg.get('postgresql_host'),
|
||
|
user=cfg.get('postgresql_username'),
|
||
|
password=cfg.get('postgresql_password'),
|
||
|
) as con, con.cursor() as cur:
|
||
|
cur.execute(f"SELECT datname FROM pg_database WHERE datname = '{uid}'")
|
||
|
response = cur.fetchall()
|
||
|
assert len(response) == 0
|
||
|
|
||
|
with g_admin_ctx():
|
||
|
user.remove_from_ldap()
|
||
|
|
||
|
|
||
|
def test_api_passwd_reset_psql(cfg, client, g_admin_ctx, ldap_user, krb_user):
|
||
|
uid = ldap_user.uid
|
||
|
|
||
|
with g_admin_ctx():
|
||
|
user = User(uid='someone_else', cn='Some Name', terms=['s2021'])
|
||
|
user.add_to_ldap()
|
||
|
|
||
|
status, data = client.post(f"/api/db/postgresql/{uid}", json={})
|
||
|
assert status == 200
|
||
|
assert 'password' in data
|
||
|
old_passwd = data['password']
|
||
|
|
||
|
con = connect(
|
||
|
host=cfg.get('postgresql_host'),
|
||
|
user=uid,
|
||
|
password=old_passwd,
|
||
|
)
|
||
|
con.close()
|
||
|
|
||
|
# normal user can get a password reset for themselves
|
||
|
status, data = client.post(f"/api/db/postgresql/{uid}/pwreset", json={}, principal=uid)
|
||
|
assert status == 200
|
||
|
assert 'password' in data
|
||
|
new_passwd = data['password']
|
||
|
|
||
|
assert old_passwd != new_passwd
|
||
|
|
||
|
# normal user cannot reset password for others
|
||
|
status, data = client.post("/api/db/postgresql/someone_else/pwreset",
|
||
|
json={}, principal=uid)
|
||
|
assert status == 403
|
||
|
|
||
|
# cannot password reset a user that does not have a database
|
||
|
status, data = client.post("/api/db/postgresql/someone_else/pwreset", json={})
|
||
|
assert status == 404
|
||
|
|
||
|
con = connect(
|
||
|
host=cfg.get('postgresql_host'),
|
||
|
user=uid,
|
||
|
password=new_passwd,
|
||
|
)
|
||
|
con.close()
|
||
|
|
||
|
status, data = client.delete(f"/api/db/postgresql/{uid}", json={})
|
||
|
assert status == 200
|
||
|
|
||
|
with g_admin_ctx():
|
||
|
user.remove_from_ldap()
|