import ceod.utils as ceod_utils import contextlib import os import grp import pwd import subprocess from subprocess import DEVNULL import tempfile from unittest.mock import patch import gssapi import pytest # map principals to GSSAPI credentials _cache = {} @contextlib.contextmanager def gssapi_token_ctx(principal: str): """ Temporarily set KRB5CCNAME to a ccache storing credentials for the specified user, and yield the GSSAPI credentials. """ old_krb5ccname = os.environ['KRB5CCNAME'] try: if principal not in _cache: f = tempfile.NamedTemporaryFile() os.environ['KRB5CCNAME'] = 'FILE:' + f.name args = ['kinit', principal] if principal == 'ceod/admin': args = ['kinit', '-k', principal] subprocess.run( args, stdout=DEVNULL, text=True, input='krb5', check=True) creds = gssapi.Credentials(name=gssapi.Name(principal), usage='initiate') # Keep the credential cache files around as long as the creds are # used, otherwise we get a "Invalid credential was supplied" error _cache[principal] = creds, f else: creds, f = _cache[principal] os.environ['KRB5CCNAME'] = 'FILE:' + f.name yield creds.export() finally: os.environ['KRB5CCNAME'] = old_krb5ccname @pytest.fixture(scope='session', autouse=True) def ccache_cleanup(): """Make sure the ccache files get deleted at the end of the tests.""" yield _cache.clear() @contextlib.contextmanager def gen_password_mock_ctx(): with patch.object(ceod_utils, 'gen_password') as mock: mock.return_value = 'krb5' yield @contextlib.contextmanager def mocks_for_create_user_ctx(): with gen_password_mock_ctx(), \ patch.object(pwd, 'getpwuid') as getpwuid_mock, \ patch.object(grp, 'getgrgid') as getgrgid_mock: # Normally, if getpwuid or getgrgid do *not* raise a KeyError, # then LDAPService will skip that UID. Therefore, by raising a # KeyError, we are making sure that the UID will *not* be skipped. getpwuid_mock.side_effect = KeyError() getgrgid_mock.side_effect = KeyError() yield