pyceo/ceod/model/KerberosService.py

90 lines
3.0 KiB
Python

import os
import subprocess
from typing import List
import gssapi
from zope import component
from zope.interface import implementer
from ceo_common.interfaces import IKerberosService, IConfig
@implementer(IKerberosService)
class KerberosService:
def __init__(self):
cfg = component.getUtility(IConfig)
# For creating new members and renewing memberships, we use the
# admin credentials
self.admin_principal = cfg.get('ldap_admin_principal')
self.admin_principal_name = gssapi.Name(self.admin_principal)
ccache_file = cfg.get('ldap_admin_principal_ccache')
self.admin_principal_ccache = 'FILE:' + ccache_file
self.admin_principal_store = {'ccache': self.admin_principal_ccache}
# For everything else, the clients forwards (delegates) their
# credentials to us. Set KRB5CCNAME to /dev/null to mitigate the
# risk of the admin creds getting accidentally used instead.
os.environ['KRB5CCNAME'] = 'FILE:/dev/null'
def _run(self, args: List[str], **kwargs):
subprocess.run(args, check=True, **kwargs)
def _kinit_admin_creds(self):
env = {'KRB5CCNAME': self.admin_principal_ccache}
self._run([
'kinit', '-k', '-p', self.admin_principal
], env=env)
def _get_admin_creds_token_impl(self) -> bytes:
creds = gssapi.Credentials(
usage='initiate', name=self.admin_principal_name,
store=self.admin_principal_store)
# this will raise a gssapi.raw.exceptions.ExpiredCredentialsError
# if the ticket has expired
creds.inquire()
return creds.export()
def get_admin_creds_token(self) -> bytes:
"""
Returns a serialized GSSAPI credential which can be used to
authenticate to the CSC LDAP server with administrative privileges.
"""
try:
return self._get_admin_creds_token_impl()
except gssapi.raw.misc.GSSError:
# Either the ccache file does not exist, or the ticket has
# expired.
# Run kinit again.
self._kinit_admin_creds()
# This time should work
return self._get_admin_creds_token_impl()
def addprinc(self, principal: str, password: str):
self._run([
'kadmin', '-k', '-p', self.admin_principal, 'addprinc',
'-pw', password,
'-policy', 'default',
'+needchange',
'+requires_preauth',
principal
])
def delprinc(self, principal: str):
self._run([
'kadmin', '-k', '-p', self.admin_principal, 'delprinc',
'-force',
principal
])
def change_password(self, principal: str, password: str):
self._run([
'kadmin', '-k', '-p', self.admin_principal, 'cpw',
'-pw', password,
principal
])
self._run([
'kadmin', '-k', '-p', self.admin_principal, 'modprinc',
'+needchange',
principal
])