import os import subprocess from typing import List import gssapi from zope import component from zope.interface import implementer from ceo_common.interfaces import IKerberosService, IConfig @implementer(IKerberosService) class KerberosService: def __init__(self): cfg = component.getUtility(IConfig) # For creating new members and renewing memberships, we use the # admin credentials self.admin_principal = cfg.get('ldap_admin_principal') self.admin_principal_name = gssapi.Name(self.admin_principal) ccache_file = cfg.get('ldap_admin_principal_ccache') self.admin_principal_ccache = 'FILE:' + ccache_file self.admin_principal_store = {'ccache': self.admin_principal_ccache} # For everything else, the clients forwards (delegates) their # credentials to us. Set KRB5CCNAME to /dev/null to mitigate the # risk of the admin creds getting accidentally used instead. os.environ['KRB5CCNAME'] = 'FILE:/dev/null' def _run(self, args: List[str], **kwargs): subprocess.run(args, check=True, **kwargs) def _kinit_admin_creds(self): env = {'KRB5CCNAME': self.admin_principal_ccache} self._run([ 'kinit', '-k', '-p', self.admin_principal ], env=env) def _get_admin_creds_token_impl(self) -> bytes: creds = gssapi.Credentials( usage='initiate', name=self.admin_principal_name, store=self.admin_principal_store) # this will raise a gssapi.raw.exceptions.ExpiredCredentialsError # if the ticket has expired creds.inquire() return creds.export() def get_admin_creds_token(self) -> bytes: """ Returns a serialized GSSAPI credential which can be used to authenticate to the CSC LDAP server with administrative privileges. """ try: return self._get_admin_creds_token_impl() except gssapi.raw.misc.GSSError: # Either the ccache file does not exist, or the ticket has # expired. # Run kinit again. self._kinit_admin_creds() # This time should work return self._get_admin_creds_token_impl() def addprinc(self, principal: str, password: str): self._run([ 'kadmin', '-k', '-p', self.admin_principal, 'addprinc', '-pw', password, '-policy', 'default', '+needchange', '+requires_preauth', principal ]) def delprinc(self, principal: str): self._run([ 'kadmin', '-k', '-p', self.admin_principal, 'delprinc', '-force', principal ]) def change_password(self, principal: str, password: str): self._run([ 'kadmin', '-k', '-p', self.admin_principal, 'cpw', '-pw', password, principal ]) self._run([ 'kadmin', '-k', '-p', self.admin_principal, 'modprinc', '+needchange', principal ])