pyceo/tests/ceod/api/test_db_mysql.py

121 lines
3.5 KiB
Python

import pytest
from ceod.model import User
from mysql.connector import connect
from mysql.connector.errors import ProgrammingError
def test_api_create_mysql_db(cfg, client, g_admin_ctx, ldap_user, krb_user):
uid = ldap_user.uid
with g_admin_ctx():
user = User(uid='someone_else', cn='Some Name', terms=['s2021'])
user.add_to_ldap()
# user should be able to create db for themselves
status, data = client.post(f"/api/db/mysql/{uid}", json={}, principal=uid)
assert status == 200
assert 'password' in data
passwd = data['password']
# conflict if attempting to create db when already has one
status, data = client.post(f"/api/db/mysql/{uid}", json={}, principal=uid)
assert status == 409
# normal user cannot create db for others
status, data = client.post("/api/db/mysql/someone_else", json={}, principal=uid)
assert status == 403
# cannot create db for user not in ldap
status, data = client.post("/api/db/mysql/user_not_found", json={})
assert status == 404
# cannot create db when username contains symbols
status, data = client.post("/api/db/mysql/!invalid", json={})
assert status == 400
with connect(
host=cfg.get('mysql_host'),
user=uid,
password=passwd,
) as con, con.cursor() as cur:
cur.execute("SHOW DATABASES")
response = cur.fetchall()
assert len(response) == 2
with pytest.raises(ProgrammingError):
cur.execute("CREATE DATABASE new_db")
status, data = client.delete(f"/api/db/mysql/{uid}", json={})
assert status == 200
# user should be deleted
with pytest.raises(ProgrammingError):
con = connect(
host=cfg.get('mysql_host'),
user=uid,
password=passwd,
)
# db should be deleted
with connect(
host=cfg.get('mysql_host'),
user=cfg.get('mysql_username'),
password=cfg.get('mysql_password'),
) as con, con.cursor() as cur:
cur.execute(f"SHOW DATABASES LIKE '{uid}'")
response = cur.fetchall()
assert len(response) == 0
with g_admin_ctx():
user.remove_from_ldap()
def test_api_passwd_reset_mysql(cfg, client, g_admin_ctx, ldap_user, krb_user):
uid = ldap_user.uid
with g_admin_ctx():
user = User(uid='someone_else', cn='Some Name', terms=['s2021'])
user.add_to_ldap()
status, data = client.post(f"/api/db/mysql/{uid}", json={})
assert status == 200
assert 'password' in data
old_passwd = data['password']
con = connect(
host=cfg.get('mysql_host'),
user=uid,
password=old_passwd,
)
con.close()
# normal user can get a password reset for themselves
status, data = client.post(f"/api/db/mysql/{uid}/pwreset", json={}, principal=uid)
assert status == 200
assert 'password' in data
new_passwd = data['password']
assert old_passwd != new_passwd
# normal user cannot reset password for others
status, data = client.post("/api/db/mysql/someone_else/pwreset", json={}, principal=uid)
assert status == 403
# cannot password reset a user that does not have a database
status, data = client.post("/api/db/mysql/someone_else/pwreset", json={})
assert status == 404
con = connect(
host=cfg.get('mysql_host'),
user=uid,
password=new_passwd,
)
con.close()
status, data = client.delete(f"/api/db/mysql/{uid}", json={})
assert status == 200
with g_admin_ctx():
user.remove_from_ldap()