@ -98,7 +98,7 @@ class LDAPConnection(object):
### Helper Methods ###
def lookup ( self , dn ) :
def lookup ( self , dn , objectClass = None ) :
"""
Helper method to retrieve the attributes of an entry .
@ -113,7 +113,11 @@ class LDAPConnection(object):
# search for the specified dn
try :
matches = self . ldap . search_s ( dn , ldap . SCOPE_BASE )
if objectClass :
search_filter = ' (objectClass= %s ) ' % self . escape ( objectClass )
matches = self . ldap . search_s ( dn , ldap . SCOPE_BASE , search_filter )
else :
matches = self . ldap . search_s ( dn , ldap . SCOPE_BASE )
except ldap . NO_SUCH_OBJECT :
return None
except ldap . LDAPError , e :
@ -122,109 +126,160 @@ class LDAPConnection(object):
# this should never happen due to the nature of DNs
if len ( matches ) > 1 :
raise LDAPException ( " duplicate dn in ldap: " + dn )
# dn was found, but didn't match the objectClass filter
elif len ( matches ) < 1 :
return None
# return the attributes of the single successful match
else :
match = matches [ 0 ]
match_dn , match_attributes = match
return match_attributes
match = matches [ 0 ]
match_dn , match_attributes = match
return match_attributes
### User-related Methods ###
def user_lookup ( self , uid ) :
def user_lookup ( self , uid , objectClass = None ) :
"""
Retrieve the attributes of a user .
Parameters :
uid - the UNIX username to look up
uid - the uid to look up
Returns : attributes of user with uid
Example : connection . user_lookup ( ' mspang ' ) - >
{ ' uid ' : ' mspang ' , ' uidNumber ' : 21292 . . . }
"""
dn = ' uid= ' + uid + ' , ' + self . user_base
return self . lookup ( dn )
return self . lookup ( dn , objectClass )
def user_search ( self , search_filter ) :
def user_search ( self , search_filter , params ) :
"""
Helper for user searches .
Search for users with a filter .
Parameters :
search_filter - LDAP filter string to match users against
Returns : the list of uids matched ( usernames )
Returns : a dictionary mapping uids to attributes
"""
if not self . connected ( ) : raise LDAPException ( " Not connected! " )
search_filter = search_filter % tuple ( self . escape ( x ) for x in params )
# search for entries that match the filter
try :
matches = self . ldap . search_s ( self . user_base , ldap . SCOPE_SUBTREE , search_filter )
except ldap . LDAPError , e :
raise LDAPException ( " user search failed: %s " % e )
# list for uids found
uids = [ ]
results = { }
for match in matches :
dn , attributes = match
# uid is a required attribute of posixAccount
if not attributes . has_key ( ' uid ' ) :
raise LDAPException ( dn + ' (posixAccount) has no uid ' )
# do not handle the case of multiple usernames in one entry (yet)
elif len ( attributes [ ' uid ' ] ) > 1 :
raise LDAPException ( dn + ' (posixAccount) has multiple uids ' )
# append the sole uid of this match to the list
uids . append ( attributes [ ' uid ' ] [ 0 ] )
dn , attrs = match
uid = attrs [ ' uid ' ] [ 0 ]
results [ uid ] = attrs
return uids
return results
def user_modify ( self , uid , attrs ) :
"""
Update user attributes in the directory .
Parameters :
uid - username of the user to modify
attrs - dictionary as returned by user_lookup ( ) with changes to make .
omitted attributes are DELETED .
Example : user = user_lookup ( ' mspang ' )
user [ ' uidNumber ' ] = [ ' 0 ' ]
connection . user_modify ( ' mspang ' , user )
"""
# distinguished name of the entry to modify
dn = ' uid= ' + uid + ' , ' + self . user_base
# retrieve current state of user
old_user = self . user_lookup ( uid )
try :
# build list of modifications to make
changes = ldap . modlist . modifyModlist ( old_user , attrs )
# apply changes
self . ldap . modify_s ( dn , changes )
except ldap . LDAPError , e :
raise LDAPException ( " unable to modify: %s " % e )
def user_delete ( self , uid ) :
"""
Removes a user from the directory .
Example : connection . user_delete ( ' mspang ' )
"""
try :
dn = ' uid= ' + uid + ' , ' + self . user_base
self . ldap . delete_s ( dn )
except ldap . LDAPError , e :
raise LDAPException ( " unable to delete: %s " % e )
### Account-related Methods ###
def account_lookup ( self , uid ) :
"""
Retrieve the attributes of an account .
Parameters :
uid - the uid to look up
Returns : attributes of user with uid
"""
return self . user_lookup ( uid , ' posixAccount ' )
def user_search_id ( self , uidNumber ) :
def account_search_id ( self , uidNumber ) :
"""
Retrieves a list of users with a certain UNIX uid number .
Retrieves a list of account s with a certain UNIX uid number .
LDAP ( or passwd for that matter ) does not enforce any
restriction on the number of accounts that can have
a certain UID . Therefore this method returns a list of matches .
LDAP ( or passwd for that matter ) does not enforce any restriction on
the number of accounts that can have a certain UID number . Therefor e
this method returns a list of matches .
Parameters :
uidNumber - the user id of the accounts desired
Returns : the list of uids matched ( usernames )
Returns : a dictionary mapping uids to attributes
Example : connection . user_search_id ( 21292 ) - > [ ' mspang ' ]
Example : connection . account _search_id( 21292 ) - > { ' mspang ' : { . . . } }
"""
# search for posixAccount entries with the specified uidNumber
search_filter = ' (&(objectClass=posixAccount)(uidNumber= %d )) ' % uidNumber
return self . user_search ( search_filter )
search_filter = ' (&(objectClass=posixAccount)(uidNumber= %s )) '
return self . user_search ( search_filter , [ uidNumber ] )
def user_search_gid ( self , gidNumber ) :
def account _search_gid( self , gidNumber ) :
"""
Retrieves a list of users with a certain UNIX gid
Retrieves a list of account s with a certain UNIX gid
number ( search by default group ) .
Returns : the list of uids matched ( usernames )
Returns : a dictionary mapping uids to attributes
"""
# search for posixAccount entries with the specified gidNumber
search_filter = ' (&(objectClass=posixAccount)(gidNumber= %d )) ' % gidNumber
return self . user_search ( search_filter )
search_filter = ' (&(objectClass=posixAccount)(gidNumber= %s )) '
return self . user_search ( search_filter , [ gidNumber ] )
def user_add ( self , uid , cn , uidNumber , gidNumber , homeDirectory , loginShell = None , gecos = None , description = None ) :
def account _add( self , uid , cn , uidNumber , gidNumber , homeDirectory , loginShell = None , gecos = None , description = None , update = Fals e ) :
"""
Adds a user to the directory .
Adds a user account to the directory .
Parameters :
uid - the UNIX username for the account
@ -235,6 +290,7 @@ class LDAPConnection(object):
loginShell - login shell for the user
gecos - comment field ( usually stores name etc )
description - description field ( optional and unimportant )
update - if True , will update existing entries
Example : connection . user_add ( ' mspang ' , ' Michael Spang ' ,
21292 , 100 , ' /users/mspang ' , ' /bin/bash ' ,
@ -254,67 +310,33 @@ class LDAPConnection(object):
' homeDirectory ' : [ homeDirectory ] ,
' gecos ' : [ gecos ] ,
}
if loginShell :
attrs [ ' loginShell ' ] = loginShell
attrs [ ' loginShell ' ] = [ loginShell ]
if description :
attrs [ ' description ' ] = [ description ]
try :
modlist = ldap . modlist . addModlist ( attrs )
self . ldap . add_s ( dn , modlist )
except ldap . LDAPError , e :
raise LDAPException ( " unable to add: %s " % e )
old_entry = self . user_lookup ( uid )
if old_entry and ' posixAccount ' not in old_entry [ ' objectClass ' ] and update :
def user_modify ( self , uid , attrs ) :
"""
Update user attributes in the directory .
attrs . update ( old_entry )
attrs [ ' objectClass ' ] = list ( attrs [ ' objectClass ' ] )
attrs [ ' objectClass ' ] . append ( ' posixAccount ' )
if not ' shadowAccount ' in attrs [ ' objectClass ' ] :
attrs [ ' objectClass ' ] . append ( ' shadowAccount ' )
Parameters :
uid - username of the user to modify
attrs - dictionary as returned by user_lookup ( ) with changes to make .
omitted attributes are DELETED .
Example : user = user_lookup ( ' mspang ' )
user [ ' uidNumber ' ] = [ ' 0 ' ]
connection . user_modify ( ' mspang ' , user )
"""
modlist = ldap . modlist . modifyModlist ( old_entry , attrs )
self . ldap . modify_s ( dn , modlist )
if not self . connected ( ) : raise LDAPException ( " Not connected! " )
else :
# distinguished name of the entry to modify
dn = ' uid= ' + uid + ' , ' + self . user_base
# retrieve current state of user
old_user = self . user_lookup ( uid )
try :
# build list of modifications to make
changes = ldap . modlist . modifyModlist ( old_user , attrs )
# apply changes
self . ldap . modify_s ( dn , changes )
modlist = ldap . modlist . addModlist ( attrs )
self . ldap . add_s ( dn , modlist )
except ldap . LDAPError , e :
raise LDAPException ( " unable to modify: %s " % e )
def user_delete ( self , uid ) :
"""
Removes a user from the directory .
Example : connection . user_delete ( ' mspang ' )
"""
if not self . connected ( ) : raise LDAPException ( " Not connected! " )
try :
dn = ' uid= ' + uid + ' , ' + self . user_base
self . ldap . delete_s ( dn )
except ldap . LDAPError , e :
raise LDAPException ( " unable to delete: %s " % e )
raise LDAPException ( " unable to add: %s " % e )
@ -355,27 +377,19 @@ class LDAPConnection(object):
try :
search_filter = ' (&(objectClass=posixGroup)(gidNumber= %d )) ' % gidNumber
matches = self . ldap . search_s ( self . group_base , ldap . SCOPE_SUBTREE , search_filter )
except ldap . LDAPError , e :
except ldap . LDAPError , e :
raise LDAPException ( " group search failed: %s " % e )
# list for groups found
group_cns = [ ]
results = { }
for match in matches :
dn , attributes = match
# cn is a required attribute of posixGroup
if not attributes . has_key ( ' cn ' ) :
raise LDAPException ( dn + ' (posixGroup) has no cn ' )
# do not handle the case of multiple cns for one group (yet)
elif len ( attributes [ ' cn ' ] ) > 1 :
raise LDAPException ( dn + ' (posixGroup) has multiple cns ' )
dn , attrs = match
uid = attrs [ ' cn ' ] [ 0 ]
results [ uid ] = attrs
# append the sole uid of this match to the list
group_cns . append ( attributes [ ' cn ' ] [ 0 ] )
return group_cns
return results
def group_add ( self , cn , gidNumber , description = None ) :
@ -457,8 +471,129 @@ class LDAPConnection(object):
raise LDAPException ( " unable to delete group: %s " % e )
### Member-related Methods ###
def member_lookup ( self , uid ) :
"""
Retrieve the attributes of a member . This method will only return
results that have the objectClass ' member ' .
Parameters :
uid - the username to look up
Returns : attributes of member with uid
Example : connection . member_lookup ( ' mspang ' ) - >
{ ' uid ' : ' mspang ' , ' uidNumber ' : 21292 . . . }
"""
if not self . connected ( ) : raise LDAPException ( " Not connected! " )
dn = ' uid= ' + uid + ' , ' + self . user_base
return self . lookup ( dn , ' member ' )
def member_search_studentid ( self , studentid ) :
"""
Retrieves a list of members with a certain studentid .
Returns : a dictionary mapping uids to attributes
"""
search_filter = ' (&(objectClass=member)(studentid= %s )) '
return self . user_search ( search_filter , [ studentid ] )
def member_search_name ( self , name ) :
"""
Retrieves a list of members with the specified name ( fuzzy ) .
Returns : a dictionary mapping uids to attributes
"""
search_filter = ' (&(objectClass=member)(cn~= %s )) '
return self . user_search ( search_filter , [ name ] )
def member_search_term ( self , term ) :
"""
Retrieves a list of members who were registered in a certain term .
Returns : a dictionary mapping uids to attributes
"""
search_filter = ' (&(objectClass=member)(term= %s )) '
return self . user_search ( search_filter , [ term ] )
def member_search_program ( self , program ) :
"""
Retrieves a list of members in a certain program ( fuzzy ) .
Returns : a dictionary mapping uids to attributes
"""
search_filter = ' (&(objectClass=member)(program~= %s )) '
return self . user_search ( search_filter , [ program ] )
def member_add ( self , uid , cn , studentid , program = None , description = None ) :
"""
Adds a member to the directory .
Parameters :
uid - the UNIX username for the member
cn - the real name of the member
studentid - the member ' s student ID number
program - the member ' s program of study
description - a description for the entry
"""
dn = ' uid= ' + uid + ' , ' + self . user_base
attrs = {
' objectClass ' : [ ' top ' , ' account ' , ' member ' ] ,
' uid ' : [ uid ] ,
' cn ' : [ cn ] ,
' studentid ' : [ studentid ] ,
}
if program :
attrs [ ' program ' ] = [ program ]
if description :
attrs [ ' description ' ] = [ description ]
try :
modlist = ldap . modlist . addModlist ( attrs )
self . ldap . add_s ( dn , modlist )
except ldap . LDAPError , e :
raise LDAPException ( " unable to add: %s " % e )
def member_add_account ( self , uid , uidNumber , gidNumber , homeDirectory , loginShell = None , gecos = None ) :
"""
Adds login privileges to a member .
"""
return self . account_add ( uid , None , uidNumber , gidNumber , homeDirectory , loginShell , gecos , None , True )
### Miscellaneous Methods ###
def escape ( self , value ) :
"""
Escapes special characters in a value so that it may be safely inserted
into an LDAP search filter .
"""
value = str ( value )
value = value . replace ( ' \\ ' , ' \\ 5c ' ) . replace ( ' * ' , ' \\ 2a ' )
value = value . replace ( ' ( ' , ' \\ 28 ' ) . replace ( ' ) ' , ' \\ 29 ' )
value = value . replace ( ' \x00 ' , ' \\ 00 ' )
return value
def used_uids ( self , minimum = None , maximum = None ) :
"""
Compiles a list of used UIDs in a range .
@ -543,9 +678,17 @@ if __name__ == '__main__':
tushell = ' /bin/false '
tugecos = ' Test User,,, '
tgname = ' testgroup '
tmname = ' testmember '
tmrname = ' Test Member '
tmstudentid = ' 99999999 '
tmprogram = ' UBW '
tmdesc = ' Test Description '
cushell = ' /bin/true '
cuhome = ' /home/changed '
curname = ' Test Modified User '
cmhome = ' /home/testmember '
cmshell = ' /bin/false '
cmgecos = ' Test Member,,, '
test ( LDAPConnection )
connection = LDAPConnection ( )
@ -563,6 +706,7 @@ if __name__ == '__main__':
try :
connection . user_delete ( tuname )
connection . user_delete ( tmname )
connection . group_delete ( tgname )
except LDAPException :
pass
@ -596,8 +740,20 @@ if __name__ == '__main__':
' cn ' : [ turname ]
}
test ( LDAPConnection . user_add )
connection . user_add ( tuname , turname , tuuid , tugid , tuhome , tushell , tugecos )
test ( LDAPConnection . account_add )
connection . account_add ( tuname , turname , tuuid , tugid , tuhome , tushell , tugecos )
success ( )
emdata = {
' uid ' : [ tmname ] ,
' cn ' : [ tmrname ] ,
' studentid ' : [ tmstudentid ] ,
' program ' : [ tmprogram ] ,
' description ' : [ tmdesc ] ,
}
test ( LDAPConnection . member_add )
connection . member_add ( tmname , tmrname , tmstudentid , tmprogram , tmdesc )
success ( )
tggid = unusedids . pop ( )
@ -610,41 +766,95 @@ if __name__ == '__main__':
connection . group_add ( tgname , tggid )
success ( )
test ( LDAPConnection . account_lookup )
udata = connection . account_lookup ( tuname )
if udata : del udata [ ' objectClass ' ]
assert_equal ( eudata , udata )
success ( )
test ( LDAPConnection . member_lookup )
mdata = connection . member_lookup ( tmname )
if mdata : del mdata [ ' objectClass ' ]
assert_equal ( emdata , mdata )
success ( )
test ( LDAPConnection . user_lookup )
udata = connection . user_lookup ( tuname )
del udata [ ' objectClass ' ]
mdata = connection . user_lookup ( tmname )
if udata : del udata [ ' objectClass ' ]
if mdata : del mdata [ ' objectClass ' ]
assert_equal ( eudata , udata )
assert_equal ( emdata , mdata )
success ( )
test ( LDAPConnection . group_lookup )
gdata = connection . group_lookup ( tgname )
del gdata [ ' objectClass ' ]
if gdata : del gdata [ ' objectClass ' ]
assert_equal ( egdata , gdata )
success ( )
test ( LDAPConnection . user _search_id)
test ( LDAPConnection . account _search_id)
eulist = [ tuname ]
ulist = connection . user _search_id( tuuid )
ulist = connection . account _search_id( tuuid ) . keys ( )
assert_equal ( eulist , ulist )
success ( )
test ( LDAPConnection . user _search_gid)
ulist = connection . user _search_gid( tugid )
test ( LDAPConnection . account _search_gid)
ulist = connection . account _search_gid( tugid )
if tuname not in ulist :
fail ( " ( %s ) not in ( %s ) " % ( tuname , ulist ) )
fail ( " %s not in %s " % ( tuname , ulist ) )
success ( )
test ( LDAPConnection . member_search_studentid )
mlist = connection . member_search_studentid ( tmstudentid ) . keys ( )
emlist = [ tmname ]
assert_equal ( emlist , mlist )
success ( )
test ( LDAPConnection . member_search_name )
mlist = connection . member_search_name ( tmrname )
if tmname not in mlist :
fail ( " %s not in %s " % ( tmname , mlist ) )
success ( )
ecudata = connection . user_lookup ( tuname )
test ( LDAPConnection . member_search_program )
mlist = connection . member_search_program ( tmprogram )
if tmname not in mlist :
fail ( " %s not in %s " % ( tmname , mlist ) )
success ( )
test ( LDAPConnection . group_search_id )
glist = connection . group_search_id ( tggid ) . keys ( )
eglist = [ tgname ]
assert_equal ( eglist , glist )
success ( )
ecudata = connection . account_lookup ( tuname )
ecudata [ ' loginShell ' ] = [ cushell ]
ecudata [ ' homeDirectory ' ] = [ cuhome ]
ecudata [ ' cn ' ] = [ curname ]
test ( LDAPConnection . user_modify )
connection . user_modify ( tuname , ecudata )
cudata = connection . user_lookup ( tuname )
cudata = connection . account _lookup( tuname )
assert_equal ( ecudata , cudata )
success ( )
tmuid = unusedids . pop ( )
tmgid = unusedids . pop ( )
emadata = emdata . copy ( )
emadata . update ( {
' loginShell ' : [ cmshell ] ,
' uidNumber ' : [ str ( tmuid ) ] ,
' gidNumber ' : [ str ( tmgid ) ] ,
' gecos ' : [ cmgecos ] ,
' homeDirectory ' : [ cmhome ] ,
} )
test ( LDAPConnection . member_add_account )
connection . member_add_account ( tmname , tmuid , tmuid , cmhome , cmshell , cmgecos )
success ( )
ecgdata = connection . group_lookup ( tgname )
ecgdata [ ' memberUid ' ] = [ tuname ]