Remove unused cruft

This commit is contained in:
Michael Spang 2007-12-12 00:39:44 -05:00
parent fddc135056
commit 8782e58118
6 changed files with 4 additions and 2487 deletions

File diff suppressed because it is too large Load Diff

View File

@ -54,13 +54,6 @@ class InvalidTerm(MemberException):
def __str__(self):
return "Term is invalid: %s" % self.term
class InvalidRealName(MemberException):
"""Exception class for invalid real names."""
def __init__(self, name):
self.name = name
def __str__(self):
return "Name is invalid: %s" % self.name
class NoSuchMember(MemberException):
"""Exception class for nonexistent members."""
def __init__(self, memberid):
@ -97,50 +90,7 @@ def connected():
### Member Table ###
def new(uid, realname, program=None):
"""
Registers a new CSC member. The member is added to the members table
and registered for the current term.
Parameters:
uid - the initial user id
realname - the full real name of the member
program - the program of study of the member
Returns: the username of the new member
Exceptions:
InvalidRealName - if the real name is malformed
Example: new("Michael Spang", program="CS") -> "mspang"
"""
# blank attributes should be NULL
if program == '': program = None
if uid == '': uid = None
# check real name format (UNIX account real names must not contain [,:=])
if not re.match(cfg['realname_regex'], realname):
raise InvalidRealName(realname)
# check for duplicate userid
member = ldap_connection.user_lookup(uid)
if member:
raise InvalidArgument("uid", uid, "duplicate uid")
# add the member to the directory
ldap_connection.member_add(uid, realname, program)
# register them for this term in the directory
member = ldap_connection.member_lookup(uid)
member['term'] = [ terms.current() ]
ldap_connection.user_modify(uid, member)
return uid
### Members ###
def get(userid):
"""
@ -283,33 +233,6 @@ def set_position(position, members):
mlist = ldap_connection.make_modlist(entry[0], entry[1])
ceo_ldap.modify_s(dn, mlist)
def delete(userid):
"""
Erase all records of a member.
Note: real members are never removed from the database
Returns: ldap entry of the member
Exceptions:
NoSuchMember - if the user id does not exist
Example: delete('ctdalek') -> { 'cn': [ 'Calum T. Dalek' ], 'term': ['s1993'], ... }
"""
# save member data
member = ldap_connection.user_lookup(userid)
# bail if not found
if not member:
raise NoSuchMember(userid)
# remove data from the directory
uid = member['uid'][0]
ldap_connection.user_delete(uid)
return member
def change_group_member(action, group, userid):
@ -333,7 +256,7 @@ def change_group_member(action, group, userid):
ceo_ldap.modify_s(group_dn, mlist)
### Term Table ###
### Terms ###
def register(userid, term_list):
"""
@ -391,25 +314,6 @@ def registered(userid, term):
return 'term' in member and term in member['term']
def member_terms(userid):
"""
Retrieves a list of terms a member is
registered for.
Parameters:
userid - the member's username
Returns: list of term strings
Example: registered('ctdalek') -> 's1993'
"""
member = ldap_connection.member_lookup(userid)
if not 'term' in member:
return []
else:
return member['term']
def group_members(group):
"""
@ -427,96 +331,3 @@ def group_members(group):
return []
else:
return []
### Tests ###
if __name__ == '__main__':
from csc.common.test import *
# t=test m=member u=updated
tmname = 'Test Member'
tmuid = 'testmember'
tmprogram = 'Metaphysics'
tm2name = 'Test Member 2'
tm2uid = 'testmember2'
tm2uname = 'Test Member II'
tm2uprogram = 'Pseudoscience'
tmdict = {'cn': [tmname], 'uid': [tmuid], 'program': [tmprogram] }
tm2dict = {'cn': [tm2name], 'uid': [tm2uid] }
tm2udict = {'cn': [tm2uname], 'uid': [tm2uid], 'program': [tm2uprogram] }
thisterm = terms.current()
nextterm = terms.next(thisterm)
test(connect)
connect()
success()
test(connected)
assert_equal(True, connected())
success()
test(new)
tmid = new(tmuid, tmname, tmprogram)
tm2id = new(tm2uid, tm2name)
success()
test(registered)
assert_equal(True, registered(tmid, thisterm))
assert_equal(True, registered(tm2id, thisterm))
assert_equal(False, registered(tmid, nextterm))
success()
test(get)
tmp = get(tmid)
del tmp['objectClass']
del tmp['term']
assert_equal(tmdict, tmp)
tmp = get(tm2id)
del tmp['objectClass']
del tmp['term']
assert_equal(tm2dict, tmp)
success()
test(list_name)
assert_equal(True, tmid in list_name(tmname).keys())
assert_equal(True, tm2id in list_name(tm2name).keys())
success()
test(register)
register(tmid, nextterm)
assert_equal(True, registered(tmid, nextterm))
success()
test(member_terms)
assert_equal([thisterm, nextterm], member_terms(tmid))
assert_equal([thisterm], member_terms(tm2id))
success()
test(list_term)
assert_equal(True, tmid in list_term(thisterm).keys())
assert_equal(True, tmid in list_term(nextterm).keys())
assert_equal(True, tm2id in list_term(thisterm).keys())
assert_equal(False, tm2id in list_term(nextterm).keys())
success()
test(get)
tmp = get(tm2id)
del tmp['objectClass']
del tmp['term']
assert_equal(tm2dict, tmp)
success()
test(delete)
delete(tmid)
delete(tm2id)
success()
test(disconnect)
disconnect()
assert_equal(False, connected())
disconnect()
success()

View File

@ -5,5 +5,4 @@ This module contains backend interfaces and related modules.
db - CEO database interface for member registrations
ldapi - LDAP interface for UNIX account attribute administration
krb - Kerberos interface for UNIX account password management
"""

View File

@ -1,227 +0,0 @@
"""
IPC Library Functions
This module contains very UNIX-specific code to allow interactive
communication with another program. For CEO they are required to
talk to kadmin because there is no Kerberos administration Python
module. Real bindings to libkadm5 are doable and thus a TODO.
"""
import os, pty, select
class _pty_file(object):
"""
A 'file'-like wrapper class for pseudoterminal file descriptors.
This wrapper is necessary because Python has a nasty habit of throwing
OSError at pty EOF.
This class also implements timeouts for read operations which are handy
for avoiding deadlock when both processes are blocked in a read().
See the Python documentation of the file class for explanation
of the methods.
"""
def __init__(self, fd):
self.fd = fd
self.buffer = ''
self.closed = False
def __repr__(self):
status = 'open'
if self.closed:
status = 'closed'
return "<" + status + " pty '" + os.ttyname(self.fd) + "'>"
def read(self, size=-1, block=True, timeout=0.1):
if self.closed: raise ValueError
if size < 0:
data = None
# read data, catching OSError as EOF
try:
while data != '':
# wait timeout for the pty to become ready, otherwise stop reading
if not block and len(select.select([self.fd], [], [], timeout)[0]) == 0:
break
data = os.read(self.fd, 65536)
self.buffer += data
except OSError:
pass
data = self.buffer
self.buffer = ''
return data
else:
if len(self.buffer) < size:
# read data, catching OSError as EOF
try:
# wait timeout for the pty to become ready, then read
if block or len(select.select([self.fd], [], [], timeout)[0]) != 0:
self.buffer += os.read(self.fd, size - len(self.buffer) )
except OSError:
pass
data = self.buffer[:size]
self.buffer = self.buffer[size:]
return data
def readline(self, size=-1, block=True, timeout=0.1):
data = None
# read data, catching OSError as EOF
try:
while data != '' and self.buffer.find("\n") == -1 and (size < 0 or len(self.buffer) < size):
# wait timeout for the pty to become ready, otherwise stop reading
if not block and len(select.select([self.fd], [], [], timeout)[0]) == 0:
break
data = os.read(self.fd, 128)
self.buffer += data
except OSError:
pass
split_index = self.buffer.find("\n") + 1
if split_index < 0:
split_index = len(self.buffer)
if size >= 0 and split_index > size:
split_index = size
line = self.buffer[:split_index]
self.buffer = self.buffer[split_index:]
return line
def readlines(self, sizehint=None, timeout=0.1):
lines = []
line = None
while True:
line = self.readline(-1, False, timeout)
if line == '': break
lines.append(line)
return lines
def write(self, data):
if self.closed: raise ValueError
os.write(self.fd, data)
def writelines(self, lines):
for line in lines:
self.write(line)
def __iter__(self):
return self
def next(self):
line = self.readline()
if line == '':
raise StopIteration
return line
def isatty(self):
if self.closed: raise ValueError
return os.isatty(self.fd)
def fileno(self):
if self.closed: raise ValueError
return self.fd
def flush(self):
if self.closed: raise ValueError
os.fsync(self.fd)
def close(self):
if not self.closed: os.close(self.fd)
self.closed = True
def popeni(command, args, env=None):
"""
Open an interactive session with another command.
Parameters:
command - the command to run (full path)
args - a list of arguments to pass to command
env - optional environment for command
Returns: (pid, stdout, stdin)
"""
# use a pipe to send data to the child
child_stdin, parent_stdin = os.pipe()
# a pipe for receiving data would cause buffering and
# is therefore not suitable for interactive communication
# i.e. parent_stdout, child_stdout = os.pipe()
# therefore a pty must be used instead
master, slave = pty.openpty()
# collect both stdout and stderr on the pty
parent_stdout, child_stdout = master, slave
parent_stderr, child_stderr = master, slave
# fork the child to communicate with
pid = os.fork()
# child process
if pid == 0:
# close all of the parent's fds
os.close(parent_stdin)
if parent_stdout != parent_stdin:
os.close(parent_stdout)
if parent_stderr != parent_stdin and parent_stderr != parent_stdout:
os.close(parent_stderr)
# if stdout is a terminal, set it to the controlling terminal
if os.isatty(child_stdout):
# determine the filename of the tty
tty = os.ttyname(child_stdout)
# create a new session to disconnect
# from the parent's controlling terminal
os.setsid()
# set the controlling terminal to the pty
# by opening it (and closing it again since
# it's already open as child_stdout)
fd = os.open(tty, os.O_RDWR)
os.close(fd)
# init stdin/out/err
os.dup2(child_stdin, 0)
os.dup2(child_stdout, 1)
if child_stderr >= 0:
os.dup2(child_stderr, 2)
# finally, execute the child
if env:
os.execv(command, args, env)
else:
os.execv(command, args)
# parent process
else:
# close all of the child's fds
os.close(child_stdin)
if child_stdout != child_stdin:
os.close(child_stdout)
if child_stderr >= 0 and child_stderr != child_stdin and child_stderr != child_stdout:
os.close(child_stderr)
return pid, _pty_file(parent_stdout), os.fdopen(parent_stdin, 'w')
### Tests ###
if __name__ == '__main__':
from csc.common.test import *
prog = '/bin/cat'
argv = [ prog ]
message = "test\n"
test(popeni)
proc, recv, send = popeni(prog, argv)
send.write(message)
send.flush()
line = recv.readline()
assert_equal(message.strip(), line.strip())
success()

View File

@ -1,536 +0,0 @@
"""
Kerberos Backend Interface
This module is intended to be a thin wrapper around Kerberos operations.
Methods on the connection object correspond in a straightforward way to
calls to the Kerberos Master server.
A Kerberos principal is the second half of a CSC UNIX account. The principal
stores the user's password and and is used for all authentication on CSC
systems. Accounts that do not authenticate (e.g. club accounts) do not need
a Kerberos principal.
Unfortunately, there are no Python bindings to libkadm at this time. As a
temporary workaround, this module communicates with the kadmin CLI interface
via a pseudo-terminal and a pipe.
"""
import os
import ipc
class KrbException(Exception):
"""Exception class for all Kerberos-related errors."""
pass
class KrbConnection(object):
"""
Connection to the Kerberos master server (kadmind). All Kerberos
principal updates are made via this class.
Exceptions: (all methods)
KrbException - on query/update failure
Example:
connection = KrbConnection()
connection.connect(...)
# make queries and updates, e.g.
connection.delete_principal("mspang")
connection.disconnect()
"""
def __init__(self):
self.pid = None
def connect(self, principal, keytab):
"""
Establishes the connection to the Kerberos master server.
Parameters:
principal - the Kerberos princiapl to authenticate as
keytab - keytab filename for authentication
Example: connection.connect('ceo/admin@CSCLUB.UWATERLOO.CA', '/etc/ceo.keytab')
"""
# check keytab
if not os.access(keytab, os.R_OK):
raise KrbException("cannot access Kerberos keytab: %s" % keytab)
# command to run
kadmin = '/usr/sbin/kadmin'
kadmin_args = ['kadmin', '-p', principal, '-kt', keytab]
# fork the kadmin command
self.pid, self.kadm_out, self.kadm_in = ipc.popeni(kadmin, kadmin_args)
# read welcome messages
welcome = self.read_result()
# sanity checks on welcome messages
for line in welcome:
# ignore auth message
if line.find("Authenticating") == 0:
continue
# ignore log file message
elif line.find("kadmin.log") != -1:
continue
# error message?
else:
raise KrbException("unexpected kadmin output: " + welcome[0])
def disconnect(self):
"""Close the connection to the master server."""
if self.pid:
# close the pipe connected to kadmin's standard input
self.kadm_in.close()
# close the master pty connected to kadmin's stdout
try:
self.kadm_out.close()
except OSError:
pass
# wait for kadmin to terminate
os.waitpid(self.pid, 0)
self.pid = None
def connected(self):
"""Determine whether the connection has been established."""
return self.pid is not None
### Helper Methods ###
def read_result(self):
"""
Helper function to read output of kadmin until it
prompts for input.
Returns: a list of lines returned by kadmin
"""
# list of lines output by kadmin
result = []
lines = []
# the kadmin prompt that signals the end output
# note: KADMIN_ARGS[0] must be "kadmin" or the actual prompt will differ
prompt = "kadmin:"
# timeout variables. the timeout will start at timeout and
# increase up to max_timeout when read() returns nothing (i.e., times out)
timeout = 0.01
timeout_increment = 0.10
timeout_maximum = 1.00
# input loop: read from kadmin until the kadmin prompt
buf = ''
while True:
# attempt to read any available data
data = self.kadm_out.read(block=False, timeout=timeout)
buf += data
# nothing was read
if data == '':
# so wait longer for data next time
if timeout < timeout_maximum:
timeout += timeout_increment
continue
# give up after too much waiting
else:
# check kadmin status
status = os.waitpid(self.pid, os.WNOHANG)
if status[0] == 0:
# kadmin still alive
raise KrbException("timeout while reading response from kadmin")
else:
# kadmin died!
raise KrbException("kadmin died while reading response:\n%s\n%s" % ("\n".join(lines), buf))
# break into lines and save all but the final
# line (which is incomplete) into result
lines = buf.split("\n")
buf = lines[-1]
lines = lines[:-1]
for line in lines:
line = line.strip()
result.append(line)
# if the incomplete line in the buffer is the kadmin prompt,
# then the result is complete and may be returned
if buf.strip() == prompt:
break
return result
def execute(self, command):
"""
Helper function to execute a kadmin command.
Parameters:
command - command string to pass on to kadmin
Returns: a list of lines output by the command
"""
# there should be no remaining output from the previous
# command. if there is then something is broken.
stale_output = self.kadm_out.read(block=False, timeout=0)
if stale_output != '':
raise KrbException("unexpected kadmin output: " + stale_output)
# send the command to kadmin
self.kadm_in.write(command + "\n")
self.kadm_in.flush()
# read the command output and return it
result = self.read_result()
return result
### Commands ###
def list_principals(self):
"""
Retrieve a list of Kerberos principals.
Returns: a list of principals
Example: connection.list_principals() -> [
"ceo/admin@CSCLUB.UWATERLOO.CA",
"sysadmin/admin@CSCLUB.UWATERLOO.CA",
"mspang@CSCLUB.UWATERLOO.CA",
...
]
"""
principals = self.execute("list_principals")
# assuming that there at least some host principals
if len(principals) < 1:
raise KrbException("no kerberos principals")
# detect error message
if principals[0].find("kadmin:") == 0:
raise KrbException("list_principals returned error: " + principals[0])
# verify principals are well-formed
for principal in principals:
if principal.find("@") == -1:
raise KrbException('malformed pricipal: "' + principal + '"')
return principals
def get_principal(self, principal):
"""
Retrieve principal details.
Returns: a dictionary of principal attributes
Example: connection.get_principal("ceo/admin@CSCLUB.UWATERLOO.CA") -> {
"Principal": "ceo/admin@CSCLUB.UWATERLOO.CA",
"Policy": "[none]",
...
}
"""
output = self.execute('get_principal "' + principal + '"')
# detect error message
if output[0].find("kadmin:") == 0:
raise KrbException("get_principal returned error: " + output[0])
# detect more errors
if output[0].find("get_principal: ") == 0:
message = output[0][15:]
# principal does not exist => None
if message.find("Principal does not exist") == 0:
return None
# dictionary to store attributes
principal_attributes = {}
# attributes that will not be returned
ignore_attributes = ['Key']
# split output into a dictionary of attributes
for line in output:
key, value = line.split(":", 1)
value = value.strip()
if not key in ignore_attributes:
principal_attributes[key] = value
return principal_attributes
def get_privs(self):
"""
Retrieve privileges of the current principal.
Returns: a list of privileges
Example: connection.get_privs() ->
[ "GET", "ADD", "MODIFY", "DELETE" ]
"""
output = self.execute("get_privs")
# one line of output is expected
if len(output) > 1:
raise KrbException("unexpected output of get_privs: " + output[1])
# detect error message
if output[0].find("kadmin:") == 0:
raise KrbException("get_privs returned error: " + output[0])
# parse output by removing the prefix and splitting it around spaces
if output[0][:20] != "current privileges: ":
raise KrbException("malformed get_privs output: " + output[0])
privs = output[0][20:].split(" ")
return privs
def add_principal(self, principal, password):
"""
Create a new principal.
Parameters:
principal - the name of the principal
password - the principal's initial password
Example: connection.add_principal("mspang@CSCLUB.UWATERLOO.CA", "opensesame")
"""
# exec the add_principal command
if password.find('"') == -1:
self.kadm_in.write('add_principal -pw "' + password + '" "' + principal + '"\n')
# fools at MIT didn't bother implementing escaping, so passwords
# that contain double quotes must be treated specially
else:
self.kadm_in.write('add_principal "' + principal + '"\n')
self.kadm_in.write(password + "\n" + password + "\n")
# send request and read response
self.kadm_in.flush()
output = self.read_result()
# verify output
created = False
for line in output:
# ignore NOTICE lines
if line.find("NOTICE:") == 0:
continue
# ignore prompts
elif line.find("Enter password") == 0 or line.find("Re-enter password") == 0:
continue
# record whether success message was encountered
elif line.find("Principal") == 0 and line.find("created.") != 0:
created = True
# error messages
elif line.find("add_principal:") == 0 or line.find("kadmin:") == 0:
# principal exists
if line.find("already exists") != -1:
raise KrbException("principal already exists")
# misc errors
else:
raise KrbException(line)
# unknown output
else:
raise KrbException("unexpected add_principal output: " + line)
# ensure success message was received
if not created:
raise KrbException("kadmin did not acknowledge principal creation")
def delete_principal(self, principal):
"""
Delete a principal.
Example: connection.delete_principal("mspang@CSCLUB.UWATERLOO.CA")
"""
# exec the delete_principal command and read response
self.kadm_in.write('delete_principal -force "' + principal + '"\n')
self.kadm_in.flush()
output = self.read_result()
# verify output
deleted = False
for line in output:
# ignore reminder
if line.find("Make sure that") == 0:
continue
# record whether success message was encountered
elif line.find("Principal") == 0 and line.find("deleted.") != -1:
deleted = True
# error messages
elif line.find("delete_principal:") == 0 or line.find("kadmin:") == 0:
# principal exists
if line.find("does not exist") != -1:
raise KrbException("principal does not exist")
# misc errors
else:
raise KrbException(line)
# unknown output
else:
raise KrbException("unexpected delete_principal output: " + line)
# ensure success message was received
if not deleted:
raise KrbException("did not receive principal deleted")
def change_password(self, principal, password):
"""
Changes a principal's password.
Example: connection.change_password("mspang@CSCLUB.UWATERLOO.CA", "opensesame")
"""
# exec the add_principal command
if password.find('"') == -1:
self.kadm_in.write('change_password -pw "' + password + '" "' + principal + '"\n')
else:
self.kadm_in.write('change_password "' + principal + '"\n')
self.kadm_in.write(password + "\n" + password + "\n")
# send request and read response
self.kadm_in.flush()
output = self.read_result()
# verify output
changed = False
for line in output:
# ignore NOTICE lines
if line.find("NOTICE:") == 0:
continue
# ignore prompts
elif line.find("Enter password") == 0 or line.find("Re-enter password") == 0:
continue
# record whether success message was encountered
elif line.find("Password") == 0 and line.find("changed.") != 0:
changed = True
# error messages
elif line.find("change_password:") == 0 or line.find("kadmin:") == 0:
raise KrbException(line)
# unknown output
else:
raise KrbException("unexpected change_password output: " + line)
# ensure success message was received
if not changed:
raise KrbException("kadmin did not acknowledge password change")
### Tests ###
if __name__ == '__main__':
from csc.common.test import *
import random
conffile = '/etc/csc/kerberos.cf'
cfg = dict([map(str.strip, a.split("=", 1)) for a in map(str.strip, open(conffile).read().split("\n")) if "=" in a ])
principal = cfg['admin_principal'][1:-1]
keytab = cfg['admin_keytab'][1:-1]
realm = cfg['realm'][1:-1]
# t=test p=principal e=expected
tpname = 'testpirate' + '@' + realm
tpw = str(random.randint(10**30, 10**31-1)) + 'YAR!'
eprivs = ['GET', 'ADD', 'MODIFY', 'DELETE']
test(KrbConnection)
connection = KrbConnection()
success()
test(connection.connect)
connection.connect(principal, keytab)
success()
try:
connection.delete_principal(tpname)
except KrbException:
pass
test(connection.connected)
assert_equal(True, connection.connected())
success()
test(connection.add_principal)
connection.add_principal(tpname, tpw)
success()
test(connection.list_principals)
pals = connection.list_principals()
assert_equal(True, tpname in pals)
success()
test(connection.get_privs)
privs = connection.get_privs()
assert_equal(eprivs, privs)
success()
test(connection.get_principal)
princ = connection.get_principal(tpname)
assert_equal(tpname, princ['Principal'])
success()
test(connection.delete_principal)
connection.delete_principal(tpname)
assert_equal(None, connection.get_principal(tpname))
success()
test(connection.disconnect)
connection.disconnect()
assert_equal(False, connection.connected())
success()

View File

@ -67,6 +67,7 @@ class LDAPConnection(object):
self.user_base = user_base
self.group_base = group_base
def connect_sasl(self, uri, mech, realm, userid, password, user_base, group_base):
# open the connection
@ -219,130 +220,6 @@ class LDAPConnection(object):
raise LDAPException("unable to modify: %s" % e)
def user_delete(self, uid):
"""
Removes a user from the directory.
Example: connection.user_delete('mspang')
"""
try:
dn = 'uid=' + uid + ',' + self.user_base
self.ldap.delete_s(dn)
except ldap.LDAPError, e:
raise LDAPException("unable to delete: %s" % e)
### Account-related Methods ###
def account_lookup(self, uid):
"""
Retrieve the attributes of an account.
Parameters:
uid - the uid to look up
Returns: attributes of user with uid
"""
return self.user_lookup(uid, 'posixAccount')
def account_search_id(self, uidNumber):
"""
Retrieves a list of accounts with a certain UNIX uid number.
LDAP (or passwd for that matter) does not enforce any restriction on
the number of accounts that can have a certain UID number. Therefore
this method returns a list of matches.
Parameters:
uidNumber - the user id of the accounts desired
Returns: a dictionary mapping uids to attributes
Example: connection.account_search_id(21292) -> {'mspang': { ... }}
"""
search_filter = '(&(objectClass=posixAccount)(uidNumber=%s))'
return self.user_search(search_filter, [ uidNumber ])
def account_search_gid(self, gidNumber):
"""
Retrieves a list of accounts with a certain UNIX gid
number (search by default group).
Returns: a dictionary mapping uids to attributes
"""
search_filter = '(&(objectClass=posixAccount)(gidNumber=%s))'
return self.user_search(search_filter, [ gidNumber ])
def account_add(self, uid, cn, uidNumber, gidNumber, homeDirectory, loginShell=None, gecos=None, description=None, update=False):
"""
Adds a user account to the directory.
Parameters:
uid - the UNIX username for the account
cn - the real name of the member
uidNumber - the UNIX user id number
gidNumber - the UNIX group id number (default group)
homeDirectory - home directory for the user
loginShell - login shell for the user
gecos - comment field (usually stores name etc)
description - description field (optional and unimportant)
update - if True, will update existing entries
Example: connection.user_add('mspang', 'Michael Spang',
21292, 100, '/users/mspang', '/bin/bash',
'Michael Spang,,,')
"""
if not self.connected(): raise LDAPException("Not connected!")
dn = 'uid=' + uid + ',' + self.user_base
attrs = {
'objectClass': [ 'top', 'account', 'posixAccount', 'shadowAccount' ],
'uid': [ uid ],
'cn': [ cn ],
'loginShell': [ loginShell ],
'uidNumber': [ str(uidNumber) ],
'gidNumber': [ str(gidNumber) ],
'homeDirectory': [ homeDirectory ],
'gecos': [ gecos ],
}
if loginShell:
attrs['loginShell'] = [ loginShell ]
if description:
attrs['description'] = [ description ]
try:
old_entry = self.user_lookup(uid)
if old_entry and 'posixAccount' not in old_entry['objectClass'] and update:
attrs.update(old_entry)
attrs['objectClass'] = list(attrs['objectClass'])
attrs['objectClass'].append('posixAccount')
if not 'shadowAccount' in attrs['objectClass']:
attrs['objectClass'].append('shadowAccount')
modlist = ldap.modlist.modifyModlist(old_entry, attrs)
self.ldap.modify_s(dn, modlist)
else:
modlist = ldap.modlist.addModlist(attrs)
self.ldap.add_s(dn, modlist)
except ldap.LDAPError, e:
raise LDAPException("unable to add: %s" % e)
### Group-related Methods ###
@ -366,114 +243,6 @@ class LDAPConnection(object):
return self.lookup(dn, 'posixGroup')
def group_search_id(self, gidNumber):
"""
Retrieves a list of groups with the specified UNIX group number.
Returns: a list of groups with gid gidNumber
Example: connection.group_search_id(1001) -> ['office']
"""
if not self.connected(): raise LDAPException("Not connected!")
# search for posixAccount entries with the specified uidNumber
try:
search_filter = '(&(objectClass=posixGroup)(gidNumber=%d))' % gidNumber
matches = self.ldap.search_s(self.group_base, ldap.SCOPE_SUBTREE, search_filter)
except ldap.LDAPError, e:
raise LDAPException("group search failed: %s" % e)
# list for groups found
results = {}
for match in matches:
dn, attrs = match
uid = attrs['cn'][0]
results[uid] = attrs
return results
def group_add(self, cn, gidNumber, description=None):
"""
Adds a group to the directory.
Example: connection.group_add('office', 1001, 'Office Staff')
"""
if not self.connected(): raise LDAPException("Not connected!")
dn = 'cn=' + cn + ',' + self.group_base
attrs = {
'objectClass': [ 'top', 'posixGroup', 'group' ],
'cn': [ cn ],
'gidNumber': [ str(gidNumber) ],
}
if description:
attrs['description'] = description
try:
modlist = ldap.modlist.addModlist(attrs)
self.ldap.add_s(dn, modlist)
except ldap.LDAPError, e:
raise LDAPException("unable to add group: %s" % e)
def group_modify(self, cn, attrs):
"""
Update group attributes in the directory.
The only available updates are fairly destructive (rename or renumber)
but this method is provided for completeness.
Parameters:
cn - name of the group to modify
entry - dictionary as returned by group_lookup() with changes to make.
omitted attributes are DELETED.
Example: group = group_lookup('office')
group['gidNumber'] = [ str(connection.first_id(20000, 40000)) ]
del group['memberUid']
connection.group_modify('office', group)
"""
if not self.connected(): raise LDAPException("Not connected!")
# distinguished name of the entry to modify
dn = 'cn=' + cn + ',' + self.group_base
# retrieve current state of group
old_group = self.group_lookup(cn)
try:
# build list of modifications to make
changes = ldap.modlist.modifyModlist(old_group, attrs)
# apply changes
self.ldap.modify_s(dn, changes)
except ldap.LDAPError, e:
raise LDAPException("unable to modify: %s" % e)
def group_delete(self, cn):
"""
Removes a group from the directory."
Example: connection.group_delete('office')
"""
if not self.connected(): raise LDAPException("Not connected!")
try:
dn = 'cn=' + cn + ',' + self.group_base
self.ldap.delete_s(dn)
except ldap.LDAPError, e:
raise LDAPException("unable to delete group: %s" % e)
### Member-related Methods ###
def member_lookup(self, uid):
@ -559,14 +328,6 @@ class LDAPConnection(object):
raise LDAPException("unable to add: %s" % e)
def member_add_account(self, uid, uidNumber, gidNumber, homeDirectory, loginShell=None, gecos=None):
"""
Adds login privileges to a member.
"""
return self.account_add(uid, None, uidNumber, gidNumber, homeDirectory, loginShell, gecos, None, True)
### Miscellaneous Methods ###
@ -583,66 +344,6 @@ class LDAPConnection(object):
return value
def used_uids(self, minimum=None, maximum=None):
"""
Compiles a list of used UIDs in a range.
Parameters:
minimum - smallest uid to return in the list
maximum - largest uid to return in the list
Returns: list of integer uids
Example: connection.used_uids(20000, 40000) -> [20000, 20001, ...]
"""
if not self.connected(): raise LDAPException("Not connected!")
try:
users = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['uidNumber'])
except ldap.LDAPError, e:
raise LDAPException("search for uids failed: %s" % e)
uids = []
for user in users:
dn, attrs = user
uid = int(attrs['uidNumber'][0])
if (not minimum or uid >= minimum) and (not maximum or uid <= maximum):
uids.append(uid)
return uids
def used_gids(self, minimum=None, maximum=None):
"""
Compiles a list of used GIDs in a range.
Parameters:
minimum - smallest gid to return in the list
maximum - largest gid to return in the list
Returns: list of integer gids
Example: connection.used_gids(20000, 40000) -> [20000, 20001, ...]
"""
if not self.connected(): raise LDAPException("Not connected!")
try:
users = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['gidNumber'])
except ldap.LDAPError, e:
raise LDAPException("search for gids failed: %s" % e)
gids = []
for user in users:
dn, attrs = user
gid = int(attrs['gidNumber'][0])
if (not minimum or gid >= minimum) and (not maximum or gid <= maximum):
gids.append(gid)
return gids
def make_modlist(self, old, new):
keys = set(old.keys()).union(set(new))
mlist = []
@ -676,213 +377,3 @@ class Sasl:
def callback(self, id, challenge, prompt, defresult):
return ''
### Tests ###
if __name__ == '__main__':
from csc.common.test import *
conffile = '/etc/csc/ldap.cf'
cfg = dict([map(str.strip, a.split("=", 1)) for a in map(str.strip, open(conffile).read().split("\n")) if "=" in a ])
srvurl = cfg['server_url'][1:-1]
binddn = cfg['admin_bind_dn'][1:-1]
bindpw = cfg['admin_bind_pw'][1:-1]
ubase = cfg['users_base'][1:-1]
gbase = cfg['groups_base'][1:-1]
minid = 99999000
maxid = 100000000
# t=test u=user g=group c=changed r=real e=expected
tuname = 'testuser'
turname = 'Test User'
tuhome = '/home/testuser'
tushell = '/bin/false'
tugecos = 'Test User,,,'
tgname = 'testgroup'
tmname = 'testmember'
tmrname = 'Test Member'
tmprogram = 'UBW'
tmdesc = 'Test Description'
cushell = '/bin/true'
cuhome = '/home/changed'
curname = 'Test Modified User'
cmhome = '/home/testmember'
cmshell = '/bin/false'
cmgecos = 'Test Member,,,'
test(LDAPConnection)
connection = LDAPConnection()
success()
test(LDAPConnection.disconnect)
connection.disconnect()
success()
test(LDAPConnection.connect)
connection.connect(srvurl, binddn, bindpw, ubase, gbase)
if not connection.connected():
fail("not connected")
success()
try:
connection.user_delete(tuname)
connection.user_delete(tmname)
connection.group_delete(tgname)
except LDAPException:
pass
test(LDAPConnection.used_uids)
uids = connection.used_uids(minid, maxid)
if type(uids) is not list:
fail("list not returned")
success()
test(LDAPConnection.used_gids)
gids = connection.used_gids(minid, maxid)
if type(gids) is not list:
fail("list not returned")
success()
unusedids = []
for idnum in xrange(minid, maxid):
if not idnum in uids and not idnum in gids:
unusedids.append(idnum)
tuuid = unusedids.pop()
tugid = unusedids.pop()
eudata = {
'uid': [ tuname ],
'loginShell': [ tushell ],
'uidNumber': [ str(tuuid) ],
'gidNumber': [ str(tugid) ],
'gecos': [ tugecos ],
'homeDirectory': [ tuhome ],
'cn': [ turname ]
}
test(LDAPConnection.account_add)
connection.account_add(tuname, turname, tuuid, tugid, tuhome, tushell, tugecos)
success()
emdata = {
'uid': [ tmname ],
'cn': [ tmrname ],
'program': [ tmprogram ],
'description': [ tmdesc ],
}
test(LDAPConnection.member_add)
connection.member_add(tmname, tmrname, tmprogram, tmdesc)
success()
tggid = unusedids.pop()
egdata = {
'cn': [ tgname ],
'gidNumber': [ str(tggid) ]
}
test(LDAPConnection.group_add)
connection.group_add(tgname, tggid)
success()
test(LDAPConnection.account_lookup)
udata = connection.account_lookup(tuname)
if udata: del udata['objectClass']
assert_equal(eudata, udata)
success()
test(LDAPConnection.member_lookup)
mdata = connection.member_lookup(tmname)
if mdata: del mdata['objectClass']
assert_equal(emdata, mdata)
success()
test(LDAPConnection.user_lookup)
udata = connection.user_lookup(tuname)
mdata = connection.user_lookup(tmname)
if udata: del udata['objectClass']
if mdata: del mdata['objectClass']
assert_equal(eudata, udata)
assert_equal(emdata, mdata)
success()
test(LDAPConnection.group_lookup)
gdata = connection.group_lookup(tgname)
if gdata: del gdata['objectClass']
assert_equal(egdata, gdata)
success()
test(LDAPConnection.account_search_id)
eulist = [ tuname ]
ulist = connection.account_search_id(tuuid).keys()
assert_equal(eulist, ulist)
success()
test(LDAPConnection.account_search_gid)
ulist = connection.account_search_gid(tugid)
if tuname not in ulist:
fail("%s not in %s" % (tuname, ulist))
success()
test(LDAPConnection.member_search_name)
mlist = connection.member_search_name(tmrname)
if tmname not in mlist:
fail("%s not in %s" % (tmname, mlist))
success()
test(LDAPConnection.member_search_program)
mlist = connection.member_search_program(tmprogram)
if tmname not in mlist:
fail("%s not in %s" % (tmname, mlist))
success()
test(LDAPConnection.group_search_id)
glist = connection.group_search_id(tggid).keys()
eglist = [ tgname ]
assert_equal(eglist, glist)
success()
ecudata = connection.account_lookup(tuname)
ecudata['loginShell'] = [ cushell ]
ecudata['homeDirectory'] = [ cuhome ]
ecudata['cn'] = [ curname ]
test(LDAPConnection.user_modify)
connection.user_modify(tuname, ecudata)
cudata = connection.account_lookup(tuname)
assert_equal(ecudata, cudata)
success()
tmuid = unusedids.pop()
tmgid = unusedids.pop()
emadata = emdata.copy()
emadata.update({
'loginShell': [ cmshell ],
'uidNumber': [ str(tmuid) ],
'gidNumber': [ str(tmgid) ],
'gecos': [ cmgecos ],
'homeDirectory': [ cmhome ],
})
test(LDAPConnection.member_add_account)
connection.member_add_account(tmname, tmuid, tmuid, cmhome, cmshell, cmgecos)
success()
ecgdata = connection.group_lookup(tgname)
ecgdata['memberUid'] = [ tuname ]
test(LDAPConnection.group_modify)
connection.group_modify(tgname, ecgdata)
cgdata = connection.group_lookup(tgname)
assert_equal(ecgdata, cgdata)
success()
test(LDAPConnection.group_delete)
connection.group_delete(tgname)
success()
test(LDAPConnection.disconnect)
connection.disconnect()
success()