pyceo/tests/conftest_ceod_api.py

117 lines
4.2 KiB
Python
Raw Normal View History

from base64 import b64encode
2021-08-19 16:33:44 -04:00
import contextlib
import os
2021-08-13 20:11:56 -04:00
import json
import socket
import subprocess
import tempfile
2021-08-13 20:11:56 -04:00
2021-08-19 12:14:41 -04:00
from flask import g
2021-08-13 20:11:56 -04:00
from flask.testing import FlaskClient
import gssapi
import pytest
from requests import Request
from requests_gssapi import HTTPSPNEGOAuth
from ceo_common.krb5.utils import get_fwd_tgt
__all__ = ['client']
2021-08-13 20:11:56 -04:00
@pytest.fixture(scope='session')
def client(app):
app_client = app.test_client()
with tempfile.TemporaryDirectory() as cache_dir:
yield CeodTestClient(app_client, cache_dir)
2021-08-13 20:11:56 -04:00
class CeodTestClient:
def __init__(self, app_client: FlaskClient, cache_dir: str):
2021-08-13 20:11:56 -04:00
self.client = app_client
self.syscom_principal = 'ctdalek'
2021-08-13 20:11:56 -04:00
# this is only used for the HTTPSNEGOAuth
self.base_url = f'http://{socket.getfqdn()}'
2021-08-19 16:33:44 -04:00
# for each principal for which we acquired a TGT, map their
# username to a file (ccache) storing their TGT
self.principal_ccaches = {}
# this is where we'll store the credentials for each principal
2021-08-19 16:33:44 -04:00
self.cache_dir = cache_dir
@contextlib.contextmanager
def krb5ccname_env(self, principal):
"""Temporarily change KRB5CCNAME to the ccache of the principal."""
old_krb5ccname = os.environ['KRB5CCNAME']
os.environ['KRB5CCNAME'] = self.principal_ccaches[principal]
try:
yield
finally:
os.environ['KRB5CCNAME'] = old_krb5ccname
2021-08-13 20:11:56 -04:00
def get_auth(self, principal):
2021-08-19 16:33:44 -04:00
"""Acquire a HTTPSPNEGOAuth instance for the principal."""
2021-08-13 20:11:56 -04:00
name = gssapi.Name(principal)
2021-08-19 16:33:44 -04:00
# the 'store' arg doesn't seem to work for DIR ccaches
with self.krb5ccname_env(principal):
creds = gssapi.Credentials(name=name, usage='initiate')
2021-08-13 20:11:56 -04:00
auth = HTTPSPNEGOAuth(
opportunistic_auth=True,
target_name='ceod',
creds=creds,
)
return auth
2021-08-19 16:33:44 -04:00
def kinit(self, principal):
"""Acquire an initial TGT for the principal."""
# For some reason, kinit with the '-c' option deletes the other
# credentials in the cache collection, so we need to override the
# env variable
subprocess.run(
['kinit', principal],
text=True, input='krb5', check=True, stdout=subprocess.DEVNULL,
env={'KRB5CCNAME': self.principal_ccaches[principal]})
2021-08-13 20:11:56 -04:00
def get_headers(self, principal):
2021-08-19 16:33:44 -04:00
if principal not in self.principal_ccaches:
_, filename = tempfile.mkstemp(dir=self.cache_dir)
self.principal_ccaches[principal] = filename
self.kinit(principal)
# Get the Authorization header (SPNEGO).
# The method doesn't matter here because we just need to extract
# the header using req.prepare().
2021-08-13 20:11:56 -04:00
req = Request('GET', self.base_url, auth=self.get_auth(principal))
headers = list(req.prepare().headers.items())
# Get the X-KRB5-CRED header (forwarded TGT).
2021-08-19 16:33:44 -04:00
cred = b64encode(get_fwd_tgt(
'ceod/' + socket.getfqdn(), self.principal_ccaches[principal]
)).decode()
headers.append(('X-KRB5-CRED', cred))
return headers
2021-08-13 20:11:56 -04:00
def request(self, method, path, principal, **kwargs):
2021-08-19 12:14:41 -04:00
# Make sure that we're not already in a request context, otherwise
# g will get overridden
with pytest.raises(RuntimeError):
'' in g
2021-08-13 20:11:56 -04:00
if principal is None:
principal = self.syscom_principal
2021-08-13 20:11:56 -04:00
resp = self.client.open(
path, method=method, headers=self.get_headers(principal), **kwargs)
status = int(resp.status.split(' ', 1)[0])
if resp.headers['content-type'] == 'application/json':
2021-08-13 20:11:56 -04:00
data = json.loads(resp.data)
else:
data = [json.loads(line) for line in resp.data.splitlines()]
2021-08-13 20:11:56 -04:00
return status, data
def get(self, path, principal=None, **kwargs):
return self.request('GET', path, principal, **kwargs)
def post(self, path, principal=None, **kwargs):
return self.request('POST', path, principal, **kwargs)
def patch(self, path, principal=None, **kwargs):
return self.request('PATCH', path, principal, **kwargs)
2021-08-13 20:11:56 -04:00
def delete(self, path, principal=None, **kwargs):
return self.request('DELETE', path, principal, **kwargs)