122 lines
3.6 KiB
Python
122 lines
3.6 KiB
Python
import pytest
|
|
|
|
from ceod.model import User
|
|
from psycopg2 import connect, OperationalError, ProgrammingError
|
|
|
|
|
|
def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_user):
|
|
uid = ldap_user.uid
|
|
|
|
with g_admin_ctx():
|
|
user = User(uid='someone_else', cn='Some Name', terms=['s2021'])
|
|
user.add_to_ldap()
|
|
|
|
# user should be able to create db for themselves
|
|
status, data = client.post(f"/api/postgresql/{uid}", json={}, principal=uid)
|
|
assert status == 200
|
|
assert 'password' in data
|
|
passwd = data['password']
|
|
|
|
# conflict if attempting to create db when already has one
|
|
status, data = client.post(f"/api/postgresql/{uid}", json={}, principal=uid)
|
|
assert status == 409
|
|
|
|
# normal user cannot create db for others
|
|
status, data = client.post("/api/postgresql/someone_else", json={}, principal=uid)
|
|
assert status == 403
|
|
|
|
# cannot create db for user not in ldap
|
|
status, data = client.post("/api/postgresql/user_not_found", json={})
|
|
assert status == 404
|
|
|
|
# cannot create db when username contains symbols
|
|
status, data = client.post("/api/postgresql/#invalid", json={})
|
|
assert status == 400
|
|
|
|
with connect(
|
|
host=cfg.get('ceod_database_host'),
|
|
user=uid,
|
|
password=passwd,
|
|
) as con:
|
|
with con.cursor() as cur:
|
|
cur.execute("SHOW DATABASES")
|
|
response = cur.fetchall()
|
|
assert len(response) == 2
|
|
|
|
with pytest.raises(ProgrammingError):
|
|
cur.execute("CREATE DATABASE new_db")
|
|
|
|
status, data = client.delete(f"/api/postgresql/{uid}", json={})
|
|
assert status == 200
|
|
|
|
# user should be deleted
|
|
with pytest.raises(OperationalError):
|
|
con = connect(
|
|
host=cfg.get('ceod_database_host'),
|
|
user=uid,
|
|
password=passwd,
|
|
)
|
|
|
|
# db should be deleted
|
|
with connect(
|
|
host=cfg.get('ceod_database_host'),
|
|
user=cfg.get('postgresql_username'),
|
|
password=cfg.get('postgresql_password'),
|
|
) as con:
|
|
with con.cursor() as cur:
|
|
cur.execute(f"SHOW DATABASES LIKE '{uid}'")
|
|
response = cur.fetchall()
|
|
assert len(response) == 0
|
|
|
|
with g_admin_ctx():
|
|
user.remove_from_ldap()
|
|
|
|
|
|
def test_api_passwd_reset_psql(cfg, client, g_admin_ctx, ldap_user):
|
|
uid = ldap_user.uid
|
|
|
|
with g_admin_ctx():
|
|
user = User(uid='someone_else', cn='Some Name', terms=['s2021'])
|
|
user.add_to_ldap()
|
|
|
|
status, data = client.post(f"/api/postgresql/{uid}", json={})
|
|
assert status == 200
|
|
assert 'password' in data
|
|
old_passwd = data['password']
|
|
|
|
con = connect(
|
|
host=cfg.get('ceod_database_host'),
|
|
user=uid,
|
|
password=old_passwd,
|
|
)
|
|
con.close()
|
|
|
|
# normal user can get a password reset for themselves
|
|
status, data = client.post(f"/api/postgresql/{uid}/pwreset", json={}, principal=uid)
|
|
assert status == 200
|
|
assert 'password' in data
|
|
new_passwd = data['password']
|
|
|
|
assert old_passwd != new_passwd
|
|
|
|
# normal user cannot reset password for others
|
|
status, data = client.post(f"/api/postgresql/{uid}/pwreset", json={}, principal='someone_else')
|
|
assert status == 403
|
|
|
|
# cannot password reset a user that does not have a database
|
|
status, data = client.post("/api/postgresql/someone_else/pwreset", json={})
|
|
assert status == 404
|
|
|
|
con = connect(
|
|
host=cfg.get('ceod_database_host'),
|
|
user=uid,
|
|
password=new_passwd,
|
|
)
|
|
con.close()
|
|
|
|
status, data = client.delete(f"/api/postgresql/{uid}", json={})
|
|
assert status == 200
|
|
|
|
with g_admin_ctx():
|
|
user.remove_from_ldap()
|