parent
29305168c3
commit
d6dbfd5d3f
@ -0,0 +1,121 @@ |
||||
import pytest |
||||
|
||||
from ceod.model import User |
||||
from mysql.connector import connect |
||||
from mysql.connector.errors import InterfaceError, ProgrammingError |
||||
|
||||
|
||||
def test_api_create_mysql_db(cfg, client, g_admin_ctx, create_user_result): |
||||
uid = create_user_result['uid'] |
||||
with g_admin_ctx(): |
||||
user = User(uid='someone_else', cn='Some Name', terms=['s2021']) |
||||
user.add_to_ldap() |
||||
|
||||
# user should be able to create db for themselves |
||||
status, data = client.post(f"/api/mysql/{uid}", json={}, principal=uid) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
passwd = data['password'] |
||||
|
||||
# conflict if attempting to create db when already has one |
||||
status, data = client.post(f"/api/mysql/{uid}", json={}, principal=uid) |
||||
assert status == 409 |
||||
|
||||
# normal user cannot create db for others |
||||
status, data = client.post(f"/api/mysql/someone_else", json={}, principal=uid) |
||||
assert status == 403 |
||||
|
||||
# cannot create db for user not in ldap |
||||
status, data = client.post("/api/mysql/user_not_found", json={}) |
||||
assert status == 404 |
||||
|
||||
# cannot create db when username contains symbols |
||||
status, data = client.post("/api/mysql/#invalid", json={}) |
||||
assert status == 400 |
||||
|
||||
with connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user=uid, |
||||
password=passwd, |
||||
) as con: |
||||
with con.cursor() as cur: |
||||
cur.execute("SHOW DATABASES") |
||||
response = cur.fetchall() |
||||
assert len(response) == 2 |
||||
|
||||
with pytest.raises(ProgrammingError): |
||||
cur.execute("CREATE DATABASE new_db") |
||||
|
||||
status, data = client.delete(f"/api/mysql/{uid}", json={}) |
||||
assert status == 200 |
||||
|
||||
# user should be deleted |
||||
with pytest.raises(InterfaceError): |
||||
con = connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user=uid, |
||||
password=passwd, |
||||
) |
||||
|
||||
# db should be deleted |
||||
with connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user=cfg.get('mysql_username'), |
||||
password=cfg.get('mysql_password'), |
||||
) as con: |
||||
with con.cursor() as cur: |
||||
cur.execute(f"SHOW DATABASES LIKE '{uid}'") |
||||
response = cur.fetchall() |
||||
assert len(response) == 0 |
||||
|
||||
with g_admin_ctx(): |
||||
user.remove_from_ldap() |
||||
|
||||
|
||||
def test_api_passwd_reset_mysql(cfg, client, g_admin_ctx, create_user_result): |
||||
with g_admin_ctx(): |
||||
user = User(uid='someone_else', cn='Some Name', terms=['s2021']) |
||||
user.add_to_ldap() |
||||
|
||||
uid = create_user_result['uid'] |
||||
|
||||
status, data = client.post(f"/api/mysql/{uid}", json={}) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
old_passwd = data['password'] |
||||
|
||||
con = connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user=uid, |
||||
password=old_passwd, |
||||
) |
||||
con.close() |
||||
|
||||
# normal user can get a password reset for themselves |
||||
status, data = client.post(f"/api/mysql/{uid}/pwreset", json={}, principal=uid) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
new_passwd = data['password'] |
||||
|
||||
assert old_passwd != new_passwd |
||||
|
||||
# normal user cannot reset password for others |
||||
status, data = client.post(f"/api/mysql/{uid}/pwreset", json={}, principal='someone_else') |
||||
assert status == 403 |
||||
|
||||
# cannot password reset a user that does not have a database |
||||
status, data = client.post(f"/api/mysql/someone_else/pwreset", json={}) |
||||
assert status == 404 |
||||
|
||||
con = connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user=uid, |
||||
password=new_passwd, |
||||
) |
||||
con.close() |
||||
|
||||
status, data = client.delete(f"/api/mysql/{uid}", json={}) |
||||
assert status == 200 |
||||
|
||||
with g_admin_ctx(): |
||||
user.remove_from_ldap() |
@ -0,0 +1,120 @@ |
||||
import pytest |
||||
|
||||
from ceod.model import User |
||||
from psycopg2 import connect, OperationalError, ProgrammingError |
||||
|
||||
|
||||
def test_api_create_psql_db(cfg, client, g_admin_ctx, create_user_result): |
||||
uid = create_user_result['uid'] |
||||
with g_admin_ctx(): |
||||
user = User(uid='someone_else', cn='Some Name', terms=['s2021']) |
||||
user.add_to_ldap() |
||||
|
||||
# user should be able to create db for themselves |
||||
status, data = client.post(f"/api/postgresql/{uid}", json={}, principal=uid) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
passwd = data['password'] |
||||
|
||||
# conflict if attempting to create db when already has one |
||||
status, data = client.post(f"/api/postgresql/{uid}", json={}, principal=uid) |
||||
assert status == 409 |
||||
|
||||
# normal user cannot create db for others |
||||
status, data = client.post(f"/api/postgresql/someone_else", json={}, principal=uid) |
||||
assert status == 403 |
||||
|
||||
# cannot create db for user not in ldap |
||||
status, data = client.post("/api/postgresql/user_not_found", json={}) |
||||
assert status == 404 |
||||
|
||||
# cannot create db when username contains symbols |
||||
status, data = client.post("/api/postgresql/#invalid", json={}) |
||||
assert status == 400 |
||||
|
||||
with connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user=uid, |
||||
password=passwd, |
||||
) as con: |
||||
with con.cursor() as cur: |
||||
cur.execute("SHOW DATABASES") |
||||
response = cur.fetchall() |
||||
assert len(response) == 2 |
||||
|
||||
with pytest.raises(ProgrammingError): |
||||
cur.execute("CREATE DATABASE new_db") |
||||
|
||||
status, data = client.delete(f"/api/postgresql/{uid}", json={}) |
||||
assert status == 200 |
||||
|
||||
# user should be deleted |
||||
with pytest.raises(OperationalError): |
||||
con = connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user=uid, |
||||
password=passwd, |
||||
) |
||||
|
||||
# db should be deleted |
||||
with connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user=cfg.get('postgresql_username'), |
||||
password=cfg.get('postgresql_password'), |
||||
) as con: |
||||
with con.cursor() as cur: |
||||
cur.execute(f"SHOW DATABASES LIKE '{uid}'") |
||||
response = cur.fetchall() |
||||
assert len(response) == 0 |
||||
|
||||
with g_admin_ctx(): |
||||
user.remove_from_ldap() |
||||
|
||||
|
||||
def test_api_passwd_reset_psql(cfg, client, g_admin_ctx, create_user_result): |
||||
with g_admin_ctx(): |
||||
user = User(uid='someone_else', cn='Some Name', terms=['s2021']) |
||||
user.add_to_ldap() |
||||
|
||||
uid = create_user_result['uid'] |
||||
|
||||
status, data = client.post(f"/api/postgresql/{uid}", json={}) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
old_passwd = data['password'] |
||||
|
||||
con = connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user=uid, |
||||
password=old_passwd, |
||||
) |
||||
con.close() |
||||
|
||||
# normal user can get a password reset for themselves |
||||
status, data = client.post(f"/api/postgresql/{uid}/pwreset", json={}, principal=uid) |
||||
assert status == 200 |
||||
assert 'password' in data |
||||
new_passwd = data['password'] |
||||
|
||||
assert old_passwd != new_passwd |
||||
|
||||
# normal user cannot reset password for others |
||||
status, data = client.post(f"/api/postgresql/{uid}/pwreset", json={}, principal='someone_else') |
||||
assert status == 403 |
||||
|
||||
# cannot password reset a user that does not have a database |
||||
status, data = client.post(f"/api/postgresql/someone_else/pwreset", json={}) |
||||
assert status == 404 |
||||
|
||||
con = connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user=uid, |
||||
password=new_passwd, |
||||
) |
||||
con.close() |
||||
|
||||
status, data = client.delete(f"/api/postgresql/{uid}", json={}) |
||||
assert status == 200 |
||||
|
||||
with g_admin_ctx(): |
||||
user.remove_from_ldap() |
@ -1,60 +0,0 @@ |
||||
import pytest |
||||
|
||||
from ceod.db.MySQLService import MySQLService |
||||
from ceo_common.errors import DatabaseConnectionError, DatabasePermissionError |
||||
from mysql.connector import connect |
||||
from mysql.connector.errors import InterfaceError, ProgrammingError |
||||
|
||||
|
||||
def test_mysql_db_create(cfg): |
||||
mysql_srv = MySQLService() |
||||
password = mysql_srv.create_db('test_jdoe') |
||||
|
||||
with connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user='test_jdoe', |
||||
password=password, |
||||
) as con: |
||||
with con.cursor() as cur: |
||||
cur.execute("SHOW DATABASES") |
||||
response = cur.fetchall() |
||||
assert len(response) == 2 |
||||
|
||||
mysql_srv.delete_db('test_jdoe') |
||||
|
||||
# user should be deleted |
||||
with pytest.raises(InterfaceError): |
||||
con = connect( |
||||
host=cfg.get('ceod_database_host'), |
||||
user='test_jdoe', |
||||
password=password, |
||||
) |
||||
|
||||
|
||||
def test_mysql_passwd_reset(): |
||||
pass |
||||
|
||||
|
||||
# test with curl |
||||
# test with invalid perms for curl |
||||
|
||||
# test perms |
||||
|
||||
# test with dup user |
||||
|
||||
# test with invalid perms for db |
||||
|
||||
# test with invalid host for db |
||||
|
||||
|
||||
# except InterfaceError: |
||||
# raise DatabaseConnectionError() |
||||
# except ProgrammingError: |
||||
# raise DatabasePermissionError() |
||||
|
||||
# ask for mysql and postgres with proper postgres configs and no public schema |
||||
|
||||
# tests are stateless |
||||
# each test should not require anything before or change anything |
||||
# this means you should delete user and databases created after done |
||||
|
@ -1 +0,0 @@ |
||||
|
Loading…
Reference in new issue