From 583fcded9b6fa0b777a0a0c7cd5f742daed1dd99 Mon Sep 17 00:00:00 2001 From: Max Erenberg Date: Thu, 19 Aug 2021 23:53:13 +0000 Subject: [PATCH] add test for API request without KRB-CRED --- tests/ceod/api/test_members.py | 6 ++++++ tests/conftest_ceod_api.py | 35 +++++++++++++++++----------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/tests/ceod/api/test_members.py b/tests/ceod/api/test_members.py index 3cd5ac5..aeb1205 100644 --- a/tests/ceod/api/test_members.py +++ b/tests/ceod/api/test_members.py @@ -204,3 +204,9 @@ def test_authz_check(client, create_user_result): del old_data['forwarding_addresses'] _, data = client.get(f'/api/members/{uid}', principal='regular1') assert data == old_data + + # If we're syscom but we don't pass credentials, the request should fail + _, data = client.post('/api/members', json={ + 'uid': 'test_1', 'cn': 'Test One', 'terms': ['s2021'], + }, principal='ctdalek', no_creds=True) + assert data[-1]['status'] == 'aborted' diff --git a/tests/conftest_ceod_api.py b/tests/conftest_ceod_api.py index baaf6a8..b2c94fc 100644 --- a/tests/conftest_ceod_api.py +++ b/tests/conftest_ceod_api.py @@ -70,7 +70,7 @@ class CeodTestClient: text=True, input='krb5', check=True, stdout=subprocess.DEVNULL, env={'KRB5CCNAME': self.principal_ccaches[principal]}) - def get_headers(self, principal): + def get_headers(self, principal: str, no_creds: bool): if principal not in self.principal_ccaches: _, filename = tempfile.mkstemp(dir=self.cache_dir) self.principal_ccaches[principal] = filename @@ -80,22 +80,23 @@ class CeodTestClient: # the header using req.prepare(). req = Request('GET', self.base_url, auth=self.get_auth(principal)) headers = list(req.prepare().headers.items()) - # Get the X-KRB5-CRED header (forwarded TGT). - cred = b64encode(get_fwd_tgt( - 'ceod/' + socket.getfqdn(), self.principal_ccaches[principal] - )).decode() - headers.append(('X-KRB5-CRED', cred)) + if not no_creds: + # Get the X-KRB5-CRED header (forwarded TGT). + cred = b64encode(get_fwd_tgt( + 'ceod/' + socket.getfqdn(), self.principal_ccaches[principal] + )).decode() + headers.append(('X-KRB5-CRED', cred)) return headers - def request(self, method, path, principal, **kwargs): + def request(self, method: str, path: str, principal: str, no_creds: bool, **kwargs): # Make sure that we're not already in a request context, otherwise # g will get overridden with pytest.raises(RuntimeError): '' in g if principal is None: principal = self.syscom_principal - resp = self.client.open( - path, method=method, headers=self.get_headers(principal), **kwargs) + headers = self.get_headers(principal, no_creds) + resp = self.client.open(path, method=method, headers=headers, **kwargs) status = int(resp.status.split(' ', 1)[0]) if resp.headers['content-type'] == 'application/json': data = json.loads(resp.data) @@ -103,14 +104,14 @@ class CeodTestClient: data = [json.loads(line) for line in resp.data.splitlines()] return status, data - def get(self, path, principal=None, **kwargs): - return self.request('GET', path, principal, **kwargs) + def get(self, path, principal=None, no_creds=False, **kwargs): + return self.request('GET', path, principal, no_creds, **kwargs) - def post(self, path, principal=None, **kwargs): - return self.request('POST', path, principal, **kwargs) + def post(self, path, principal=None, no_creds=False, **kwargs): + return self.request('POST', path, principal, no_creds, **kwargs) - def patch(self, path, principal=None, **kwargs): - return self.request('PATCH', path, principal, **kwargs) + def patch(self, path, principal=None, no_creds=False, **kwargs): + return self.request('PATCH', path, principal, no_creds, **kwargs) - def delete(self, path, principal=None, **kwargs): - return self.request('DELETE', path, principal, **kwargs) + def delete(self, path, principal=None, no_creds=False, **kwargs): + return self.request('DELETE', path, principal, no_creds, **kwargs)