add test for API request without KRB-CRED
This commit is contained in:
parent
46fd926acc
commit
583fcded9b
|
@ -204,3 +204,9 @@ def test_authz_check(client, create_user_result):
|
||||||
del old_data['forwarding_addresses']
|
del old_data['forwarding_addresses']
|
||||||
_, data = client.get(f'/api/members/{uid}', principal='regular1')
|
_, data = client.get(f'/api/members/{uid}', principal='regular1')
|
||||||
assert data == old_data
|
assert data == old_data
|
||||||
|
|
||||||
|
# If we're syscom but we don't pass credentials, the request should fail
|
||||||
|
_, data = client.post('/api/members', json={
|
||||||
|
'uid': 'test_1', 'cn': 'Test One', 'terms': ['s2021'],
|
||||||
|
}, principal='ctdalek', no_creds=True)
|
||||||
|
assert data[-1]['status'] == 'aborted'
|
||||||
|
|
|
@ -70,7 +70,7 @@ class CeodTestClient:
|
||||||
text=True, input='krb5', check=True, stdout=subprocess.DEVNULL,
|
text=True, input='krb5', check=True, stdout=subprocess.DEVNULL,
|
||||||
env={'KRB5CCNAME': self.principal_ccaches[principal]})
|
env={'KRB5CCNAME': self.principal_ccaches[principal]})
|
||||||
|
|
||||||
def get_headers(self, principal):
|
def get_headers(self, principal: str, no_creds: bool):
|
||||||
if principal not in self.principal_ccaches:
|
if principal not in self.principal_ccaches:
|
||||||
_, filename = tempfile.mkstemp(dir=self.cache_dir)
|
_, filename = tempfile.mkstemp(dir=self.cache_dir)
|
||||||
self.principal_ccaches[principal] = filename
|
self.principal_ccaches[principal] = filename
|
||||||
|
@ -80,22 +80,23 @@ class CeodTestClient:
|
||||||
# the header using req.prepare().
|
# the header using req.prepare().
|
||||||
req = Request('GET', self.base_url, auth=self.get_auth(principal))
|
req = Request('GET', self.base_url, auth=self.get_auth(principal))
|
||||||
headers = list(req.prepare().headers.items())
|
headers = list(req.prepare().headers.items())
|
||||||
# Get the X-KRB5-CRED header (forwarded TGT).
|
if not no_creds:
|
||||||
cred = b64encode(get_fwd_tgt(
|
# Get the X-KRB5-CRED header (forwarded TGT).
|
||||||
'ceod/' + socket.getfqdn(), self.principal_ccaches[principal]
|
cred = b64encode(get_fwd_tgt(
|
||||||
)).decode()
|
'ceod/' + socket.getfqdn(), self.principal_ccaches[principal]
|
||||||
headers.append(('X-KRB5-CRED', cred))
|
)).decode()
|
||||||
|
headers.append(('X-KRB5-CRED', cred))
|
||||||
return headers
|
return headers
|
||||||
|
|
||||||
def request(self, method, path, principal, **kwargs):
|
def request(self, method: str, path: str, principal: str, no_creds: bool, **kwargs):
|
||||||
# Make sure that we're not already in a request context, otherwise
|
# Make sure that we're not already in a request context, otherwise
|
||||||
# g will get overridden
|
# g will get overridden
|
||||||
with pytest.raises(RuntimeError):
|
with pytest.raises(RuntimeError):
|
||||||
'' in g
|
'' in g
|
||||||
if principal is None:
|
if principal is None:
|
||||||
principal = self.syscom_principal
|
principal = self.syscom_principal
|
||||||
resp = self.client.open(
|
headers = self.get_headers(principal, no_creds)
|
||||||
path, method=method, headers=self.get_headers(principal), **kwargs)
|
resp = self.client.open(path, method=method, headers=headers, **kwargs)
|
||||||
status = int(resp.status.split(' ', 1)[0])
|
status = int(resp.status.split(' ', 1)[0])
|
||||||
if resp.headers['content-type'] == 'application/json':
|
if resp.headers['content-type'] == 'application/json':
|
||||||
data = json.loads(resp.data)
|
data = json.loads(resp.data)
|
||||||
|
@ -103,14 +104,14 @@ class CeodTestClient:
|
||||||
data = [json.loads(line) for line in resp.data.splitlines()]
|
data = [json.loads(line) for line in resp.data.splitlines()]
|
||||||
return status, data
|
return status, data
|
||||||
|
|
||||||
def get(self, path, principal=None, **kwargs):
|
def get(self, path, principal=None, no_creds=False, **kwargs):
|
||||||
return self.request('GET', path, principal, **kwargs)
|
return self.request('GET', path, principal, no_creds, **kwargs)
|
||||||
|
|
||||||
def post(self, path, principal=None, **kwargs):
|
def post(self, path, principal=None, no_creds=False, **kwargs):
|
||||||
return self.request('POST', path, principal, **kwargs)
|
return self.request('POST', path, principal, no_creds, **kwargs)
|
||||||
|
|
||||||
def patch(self, path, principal=None, **kwargs):
|
def patch(self, path, principal=None, no_creds=False, **kwargs):
|
||||||
return self.request('PATCH', path, principal, **kwargs)
|
return self.request('PATCH', path, principal, no_creds, **kwargs)
|
||||||
|
|
||||||
def delete(self, path, principal=None, **kwargs):
|
def delete(self, path, principal=None, no_creds=False, **kwargs):
|
||||||
return self.request('DELETE', path, principal, **kwargs)
|
return self.request('DELETE', path, principal, no_creds, **kwargs)
|
||||||
|
|
Loading…
Reference in New Issue