pyceo/ceo_common/krb5/utils.py

202 lines
6.7 KiB
Python

import contextlib
import functools
from typing import Union
from ._krb5 import ffi, lib
from ceo_common.errors import KerberosError
def check_rc(k_ctx, rc: int):
"""
Check the return code of a krb5 function.
An exception is raised if rc != 0.
"""
if rc == 0:
return
c_msg = lib.krb5_get_error_message(k_ctx, rc)
msg = ffi.string(c_msg).decode()
lib.krb5_free_error_message(k_ctx, c_msg)
raise KerberosError(msg)
@contextlib.contextmanager
def get_krb5_context():
"""Yields a krb5_context."""
k_ctx = None
try:
p_k_ctx = ffi.new('krb5_context *')
rc = lib.krb5_init_context(p_k_ctx)
k_ctx = p_k_ctx[0] # dereference the pointer
check_rc(k_ctx, rc)
yield k_ctx
finally:
if k_ctx is not None:
lib.krb5_free_context(k_ctx)
@contextlib.contextmanager
def get_krb5_auth_context(k_ctx):
"""Yields a krb5_auth_context."""
a_ctx = None
try:
p_a_ctx = ffi.new('krb5_auth_context *')
rc = lib.krb5_auth_con_init(k_ctx, p_a_ctx)
a_ctx = p_a_ctx[0]
check_rc(k_ctx, rc)
# clear the flags which enable the replay cache
rc = lib.krb5_auth_con_setflags(k_ctx, a_ctx, 0)
check_rc(k_ctx, rc)
yield a_ctx
finally:
if a_ctx is not None:
lib.krb5_auth_con_free(k_ctx, a_ctx)
@contextlib.contextmanager
def get_krb5_cc_default(k_ctx):
"""Yields the default krb5_ccache."""
cache = None
try:
p_cache = ffi.new('krb5_ccache *')
rc = lib.krb5_cc_default(k_ctx, p_cache)
check_rc(k_ctx, rc)
cache = p_cache[0]
yield cache
finally:
if cache is not None:
lib.krb5_cc_close(k_ctx, cache)
@contextlib.contextmanager
def get_krb5_cc_resolve(k_ctx, name: str):
"""
Resolve a credential cache name.
`name` should have the format 'type:residual'.
"""
cache = None
try:
c_name = ffi.new('char[]', name.encode())
p_cache = ffi.new('krb5_ccache *')
rc = lib.krb5_cc_resolve(k_ctx, c_name, p_cache)
check_rc(k_ctx, rc)
cache = p_cache[0]
yield cache
finally:
if cache is not None:
lib.krb5_cc_close(k_ctx, cache)
def get_fwd_tgt(server: str, cache_name: Union[str, None] = None) -> bytes:
"""
Get a forwarded TGT formatted as a KRB-CRED message.
`server` should have the format 'service/host',
e.g. 'ceod/phosphoric-acid.csclub.uwaterloo.ca'.
If `cache_name` is None, the default cache will be used; otherwise,
the cache with that name will be used.
"""
if cache_name is None:
cache_function = get_krb5_cc_default
else:
cache_function = functools.partial(get_krb5_cc_resolve, name=cache_name)
with get_krb5_context() as k_ctx, \
get_krb5_auth_context(k_ctx) as a_ctx, \
cache_function(k_ctx) as cache:
server_princ = None
client_princ = None
outbuf = None
try:
# create a server principal from the server string
c_server = ffi.new('char[]', server.encode())
p_server_princ = ffi.new('krb5_principal *')
rc = lib.krb5_parse_name(k_ctx, c_server, p_server_princ)
check_rc(k_ctx, rc)
server_princ = p_server_princ[0]
# get a client principal from the default cache
p_client_princ = ffi.new('krb5_principal *')
rc = lib.krb5_cc_get_principal(k_ctx, cache, p_client_princ)
check_rc(k_ctx, rc)
client_princ = p_client_princ[0]
# get the forwarded TGT
p_outbuf = ffi.new('krb5_data *')
rc = lib.krb5_fwd_tgt_creds(
k_ctx, a_ctx, ffi.NULL, client_princ, server_princ,
cache, 0, p_outbuf)
check_rc(k_ctx, rc)
outbuf = p_outbuf[0]
return ffi.unpack(outbuf.data, outbuf.length)
finally:
if outbuf is not None:
lib.krb5_free_data_contents(k_ctx, p_outbuf)
if client_princ is not None:
lib.krb5_free_principal(k_ctx, client_princ)
if server_princ is not None:
lib.krb5_free_principal(k_ctx, server_princ)
@contextlib.contextmanager
def store_fwd_tgt_creds(cred_data_bytes: bytes):
"""
Stores the credentials found in cred_data_bytes
in a new sub-cache in the default cache (which should be of type DIR),
yields the name of the principal in the credential, then finally removes
the sub-cache from the collection.
The principal name will have the form 'username@REALM', e.g.
'ctdalek@CSCLUB.UWATERLOO.CA'.
"""
with get_krb5_context() as k_ctx, get_krb5_auth_context(k_ctx) as a_ctx:
creds_out = None
cache = None
c_name = None
try:
# fill a krb5_data struct
cred_data = ffi.new('krb5_data *')
cred_data_buf = ffi.new('char[]', cred_data_bytes)
cred_data.magic = lib.KV5M_DATA
cred_data.length = len(cred_data_bytes)
cred_data.data = cred_data_buf
# read the KRB5-CRED message into a krb5_creds array
p_creds_out = ffi.new('krb5_creds ***')
rc = lib.krb5_rd_cred(k_ctx, a_ctx, cred_data, p_creds_out, ffi.NULL)
check_rc(k_ctx, rc)
creds_out = p_creds_out[0]
# there should only be one cred in the array
cred = creds_out[0]
# read the name of the client principal
client_princ = cred.client
p_name = ffi.new('char **')
rc = lib.krb5_unparse_name(k_ctx, client_princ, p_name)
check_rc(k_ctx, rc)
c_name = p_name[0]
name = ffi.string(c_name).decode()
# create a new cache inside the collection (default cache)
p_cache = ffi.new('krb5_ccache *')
cctype = ffi.new('char[]', b'DIR')
rc = lib.krb5_cc_new_unique(k_ctx, cctype, ffi.NULL, p_cache)
check_rc(k_ctx, rc)
cache = p_cache[0]
# initialize the new cache
rc = lib.krb5_cc_initialize(k_ctx, cache, client_princ)
check_rc(k_ctx, rc)
# store the cred into the cache
rc = lib.krb5_cc_store_cred(k_ctx, cache, cred)
check_rc(k_ctx, rc)
yield name
finally:
if cache is not None:
# We destroy the cache (instead of closing it) since we want
# to remove it from disk.
lib.krb5_cc_destroy(k_ctx, cache)
if c_name is not None:
lib.krb5_free_unparsed_name(k_ctx, c_name)
if creds_out is not None:
lib.krb5_free_tgt_creds(k_ctx, creds_out)