pyceo/tests/conftest_ceod_api.py

85 lines
3.1 KiB
Python
Raw Normal View History

from base64 import b64encode
2021-08-13 20:11:56 -04:00
import json
import socket
2021-08-19 12:14:41 -04:00
from flask import g
2021-08-13 20:11:56 -04:00
from flask.testing import FlaskClient
import gssapi
import pytest
from requests import Request
from requests_gssapi import HTTPSPNEGOAuth
from ceo_common.krb5.utils import get_fwd_tgt
2021-08-23 19:01:24 -04:00
from .utils import krb5ccname_ctx
__all__ = ['client']
2021-08-13 20:11:56 -04:00
@pytest.fixture(scope='session')
def client(app):
app_client = app.test_client()
2021-08-23 19:01:24 -04:00
yield CeodTestClient(app_client)
2021-08-13 20:11:56 -04:00
class CeodTestClient:
2021-08-23 19:01:24 -04:00
def __init__(self, app_client: FlaskClient):
2021-08-13 20:11:56 -04:00
self.client = app_client
self.syscom_principal = 'ctdalek'
2021-08-13 20:11:56 -04:00
# this is only used for the HTTPSNEGOAuth
self.base_url = f'http://{socket.getfqdn()}'
2021-08-21 02:27:33 -04:00
# for SPNEGO
self.target_name = gssapi.Name('ceod/' + socket.getfqdn())
2021-08-19 16:33:44 -04:00
2021-08-13 20:11:56 -04:00
def get_auth(self, principal):
2021-08-19 16:33:44 -04:00
"""Acquire a HTTPSPNEGOAuth instance for the principal."""
2021-08-13 20:11:56 -04:00
name = gssapi.Name(principal)
2021-08-19 16:33:44 -04:00
# the 'store' arg doesn't seem to work for DIR ccaches
2021-08-23 19:01:24 -04:00
creds = gssapi.Credentials(name=name, usage='initiate')
2021-08-13 20:11:56 -04:00
auth = HTTPSPNEGOAuth(
opportunistic_auth=True,
2021-08-21 02:27:33 -04:00
target_name=self.target_name,
2021-08-13 20:11:56 -04:00
creds=creds,
)
return auth
2021-08-23 09:59:01 -04:00
def get_headers(self, principal: str, need_cred: bool):
2021-08-23 19:01:24 -04:00
with krb5ccname_ctx(principal):
# Get the Authorization header (SPNEGO).
# The method doesn't matter here because we just need to extract
# the header using req.prepare().
req = Request('GET', self.base_url, auth=self.get_auth(principal))
headers = list(req.prepare().headers.items())
if need_cred:
# Get the X-KRB5-CRED header (forwarded TGT).
cred = b64encode(get_fwd_tgt('ceod/' + socket.getfqdn())).decode()
headers.append(('X-KRB5-CRED', cred))
return headers
2021-08-13 20:11:56 -04:00
2021-08-23 09:59:01 -04:00
def request(self, method: str, path: str, principal: str, need_cred: bool, **kwargs):
2021-08-19 12:14:41 -04:00
# Make sure that we're not already in a request context, otherwise
# g will get overridden
with pytest.raises(RuntimeError):
'' in g
2021-08-13 20:11:56 -04:00
if principal is None:
principal = self.syscom_principal
2021-08-23 09:59:01 -04:00
headers = self.get_headers(principal, need_cred)
resp = self.client.open(path, method=method, headers=headers, **kwargs)
2021-08-13 20:11:56 -04:00
status = int(resp.status.split(' ', 1)[0])
if resp.headers['content-type'] == 'application/json':
2021-08-13 20:11:56 -04:00
data = json.loads(resp.data)
else:
data = [json.loads(line) for line in resp.data.splitlines()]
2021-08-13 20:11:56 -04:00
return status, data
2021-08-23 09:59:01 -04:00
def get(self, path, principal=None, need_cred=True, **kwargs):
return self.request('GET', path, principal, need_cred, **kwargs)
2021-08-13 20:11:56 -04:00
2021-08-23 09:59:01 -04:00
def post(self, path, principal=None, need_cred=True, **kwargs):
return self.request('POST', path, principal, need_cred, **kwargs)
2021-08-13 20:11:56 -04:00
2021-08-23 09:59:01 -04:00
def patch(self, path, principal=None, need_cred=True, **kwargs):
return self.request('PATCH', path, principal, need_cred, **kwargs)
2021-08-23 09:59:01 -04:00
def delete(self, path, principal=None, need_cred=True, **kwargs):
return self.request('DELETE', path, principal, need_cred, **kwargs)