add simple authz tests

This commit is contained in:
Max Erenberg 2021-08-19 20:33:44 +00:00
parent 26fd8f6f68
commit 490abb302c
4 changed files with 58 additions and 21 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
__pycache__/ __pycache__/
*.pyc
/venv/ /venv/
.vscode/ .vscode/
/cred /cred

View File

@ -93,7 +93,8 @@ def get_fwd_tgt(server: str, cache_name: Union[str, None] = None) -> bytes:
e.g. 'ceod/phosphoric-acid.csclub.uwaterloo.ca'. e.g. 'ceod/phosphoric-acid.csclub.uwaterloo.ca'.
If `cache_name` is None, the default cache will be used; otherwise, If `cache_name` is None, the default cache will be used; otherwise,
the cache with that name will be used. the cache with that name will be used. Must have the format
'TYPE:residual'.
""" """
if cache_name is None: if cache_name is None:
cache_function = get_krb5_cc_default cache_function = get_krb5_cc_default

View File

@ -80,7 +80,7 @@ def test_api_next_uid(cfg, client, create_user_result):
def test_api_get_user(cfg, client, create_user_result): def test_api_get_user(cfg, client, create_user_result):
old_data = create_user_result old_data = create_user_result.copy()
uid = old_data['uid'] uid = old_data['uid']
del old_data['password'] del old_data['password']
@ -145,7 +145,7 @@ def test_api_patch_user(client, create_user_result):
def test_api_renew_user(cfg, client, create_user_result, ldap_conn): def test_api_renew_user(cfg, client, create_user_result, ldap_conn):
data = create_user_result data = create_user_result.copy()
uid = data['uid'] uid = data['uid']
old_terms = data['terms'] old_terms = data['terms']
old_non_member_terms = data.get('non_member_terms', []) old_non_member_terms = data.get('non_member_terms', [])
@ -188,3 +188,19 @@ def test_api_reset_password(client, create_user_result):
status, data = client.post(f'/api/members/{uid}/pwreset') status, data = client.post(f'/api/members/{uid}/pwreset')
assert status == 200 assert status == 200
assert data['password'] == 'krb5' assert data['password'] == 'krb5'
def test_authz_check(client, create_user_result):
# non-staff members may not create users
status, data = client.post('/api/members', json={
'uid': 'test_1', 'cn': 'Test One', 'terms': ['s2021'],
}, principal='regular1')
assert status == 403
# non-syscom members may not see forwarding addresses
old_data = create_user_result.copy()
uid = old_data['uid']
del old_data['password']
del old_data['forwarding_addresses']
_, data = client.get(f'/api/members/{uid}', principal='regular1')
assert data == old_data

View File

@ -1,4 +1,6 @@
from base64 import b64encode from base64 import b64encode
import contextlib
import os
import json import json
import socket import socket
import subprocess import subprocess
@ -29,18 +31,28 @@ class CeodTestClient:
self.syscom_principal = 'ctdalek' self.syscom_principal = 'ctdalek'
# this is only used for the HTTPSNEGOAuth # this is only used for the HTTPSNEGOAuth
self.base_url = f'http://{socket.getfqdn()}' self.base_url = f'http://{socket.getfqdn()}'
# keep a list of all of the principals for which we acquired a TGT # for each principal for which we acquired a TGT, map their
self.principals = [] # username to a file (ccache) storing their TGT
self.principal_ccaches = {}
# this is where we'll store the credentials for each principal # this is where we'll store the credentials for each principal
self.ccache = 'DIR:' + cache_dir self.cache_dir = cache_dir
@contextlib.contextmanager
def krb5ccname_env(self, principal):
"""Temporarily change KRB5CCNAME to the ccache of the principal."""
old_krb5ccname = os.environ['KRB5CCNAME']
os.environ['KRB5CCNAME'] = self.principal_ccaches[principal]
try:
yield
finally:
os.environ['KRB5CCNAME'] = old_krb5ccname
def get_auth(self, principal): def get_auth(self, principal):
"""Acquire a HTTPSPNEGOAuth instance for the principal."""
name = gssapi.Name(principal) name = gssapi.Name(principal)
creds = gssapi.Credentials( # the 'store' arg doesn't seem to work for DIR ccaches
name=name, with self.krb5ccname_env(principal):
usage='initiate', creds = gssapi.Credentials(name=name, usage='initiate')
store={'ccache': self.ccache},
)
auth = HTTPSPNEGOAuth( auth = HTTPSPNEGOAuth(
opportunistic_auth=True, opportunistic_auth=True,
target_name='ceod', target_name='ceod',
@ -48,23 +60,30 @@ class CeodTestClient:
) )
return auth return auth
def kinit(self, principal):
"""Acquire an initial TGT for the principal."""
# For some reason, kinit with the '-c' option deletes the other
# credentials in the cache collection, so we need to override the
# env variable
subprocess.run(
['kinit', principal],
text=True, input='krb5', check=True, stdout=subprocess.DEVNULL,
env={'KRB5CCNAME': self.principal_ccaches[principal]})
def get_headers(self, principal): def get_headers(self, principal):
if principal not in self.principals: if principal not in self.principal_ccaches:
# Acquire the initial TGT _, filename = tempfile.mkstemp(dir=self.cache_dir)
subprocess.run( self.principal_ccaches[principal] = filename
['kinit', '-c', self.ccache, principal], self.kinit(principal)
text=True, input='krb5',
check=True, stdout=subprocess.DEVNULL)
self.principals.append(principal)
# Get the Authorization header (SPNEGO). # Get the Authorization header (SPNEGO).
# The method doesn't matter here because we just need to extract # The method doesn't matter here because we just need to extract
# the header using req.prepare(). # the header using req.prepare().
req = Request('GET', self.base_url, auth=self.get_auth(principal)) req = Request('GET', self.base_url, auth=self.get_auth(principal))
headers = list(req.prepare().headers.items()) headers = list(req.prepare().headers.items())
# Get the X-KRB5-CRED header (forwarded TGT). # Get the X-KRB5-CRED header (forwarded TGT).
cred = b64encode( cred = b64encode(get_fwd_tgt(
get_fwd_tgt('ceod/' + socket.getfqdn(), self.ccache) 'ceod/' + socket.getfqdn(), self.principal_ccaches[principal]
).decode() )).decode()
headers.append(('X-KRB5-CRED', cred)) headers.append(('X-KRB5-CRED', cred))
return headers return headers