4 This module is intended to be a thin wrapper around LDAP operations.
5 Methods on the connection object correspond in a straightforward way
6 to LDAP queries and updates.
8 A LDAP entry is the most important component of a CSC UNIX account.
9 The entry contains the username, user id number, real name, shell,
10 and other important information. All non-local UNIX accounts must
11 have an LDAP entry, even if the account does not log in directly.
13 This module makes use of python-ldap, a Python module with bindings
14 to libldap, OpenLDAP's native C client library.
19 class LDAPException(Exception):
20 """Exception class for LDAP-related errors."""
23 class LDAPConnection(object):
25 Connection to the LDAP directory. All directory
26 queries and updates are made via this class.
28 Exceptions: (all methods)
29 LDAPException - on directory query failure
32 connection = LDAPConnection()
33 connection.connect(...)
35 # make queries and updates, e.g.
36 connection.user_delete('mspang')
38 connection.disconnect()
45 def connect(self, server, bind_dn, bind_pw, user_base, group_base):
47 Establish a connection to the LDAP Server.
50 server - connection string (e.g. ldap://foo.com, ldaps://bar.com)
51 bind_dn - distinguished name to bind to
52 bind_pw - password of bind_dn
53 user_base - base of the users subtree
54 group_base - baes of the group subtree
56 Example: connect('ldaps:///', 'cn=ceo,dc=csclub,dc=uwaterloo,dc=ca',
57 'secret', 'ou=People,dc=csclub,dc=uwaterloo,dc=ca',
58 'ou=Group,dc=csclub,dc=uwaterloo,dc=ca')
62 if bind_pw is None: bind_pw = ''
67 self.ldap = ldap.initialize(server)
70 self.ldap.simple_bind_s(bind_dn, bind_pw)
72 except ldap.LDAPError, e:
73 raise LDAPException("unable to connect: %s" % e)
75 self.user_base = user_base
76 self.group_base = group_base
80 """Close the connection to the LDAP server."""
88 except ldap.LDAPError, e:
89 raise LDAPException("unable to disconnect: %s" % e)
93 """Determine whether the connection has been established."""
95 return self.ldap is not None
99 ### Helper Methods ###
101 def lookup(self, dn):
103 Helper method to retrieve the attributes of an entry.
106 dn - the distinguished name of the directory entry
108 Returns: a dictionary of attributes of the matched dn, or
109 None of the dn does not exist in the directory
112 if not self.connected(): raise LDAPException("Not connected!")
114 # search for the specified dn
116 matches = self.ldap.search_s(dn, ldap.SCOPE_BASE)
117 except ldap.NO_SUCH_OBJECT:
119 except ldap.LDAPError, e:
120 raise LDAPException("unable to lookup dn %s: %s" % (dn, e))
122 # this should never happen due to the nature of DNs
124 raise LDAPException("duplicate dn in ldap: " + dn)
126 # return the attributes of the single successful match
129 match_dn, match_attributes = match
130 return match_attributes
134 ### User-related Methods ###
136 def user_lookup(self, uid):
138 Retrieve the attributes of a user.
141 uid - the UNIX username to look up
143 Returns: attributes of user with uid
145 Example: connection.user_lookup('mspang') ->
146 { 'uid': 'mspang', 'uidNumber': 21292 ...}
149 dn = 'uid=' + uid + ',' + self.user_base
150 return self.lookup(dn)
153 def user_search(self, search_filter):
155 Helper for user searches.
158 search_filter - LDAP filter string to match users against
160 Returns: the list of uids matched (usernames)
163 if not self.connected(): raise LDAPException("Not connected!")
165 # search for entries that match the filter
167 matches = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, search_filter)
168 except ldap.LDAPError, e:
169 raise LDAPException("user search failed: %s" % e)
171 # list for uids found
174 for match in matches:
175 dn, attributes = match
177 # uid is a required attribute of posixAccount
178 if not attributes.has_key('uid'):
179 raise LDAPException(dn + ' (posixAccount) has no uid')
181 # do not handle the case of multiple usernames in one entry (yet)
182 elif len(attributes['uid']) > 1:
183 raise LDAPException(dn + ' (posixAccount) has multiple uids')
185 # append the sole uid of this match to the list
186 uids.append( attributes['uid'][0] )
191 def user_search_id(self, uidNumber):
193 Retrieves a list of users with a certain UNIX uid number.
195 LDAP (or passwd for that matter) does not enforce any
196 restriction on the number of accounts that can have
197 a certain UID. Therefore this method returns a list of matches.
200 uidNumber - the user id of the accounts desired
202 Returns: the list of uids matched (usernames)
204 Example: connection.user_search_id(21292) -> ['mspang']
207 # search for posixAccount entries with the specified uidNumber
208 search_filter = '(&(objectClass=posixAccount)(uidNumber=%d))' % uidNumber
209 return self.user_search(search_filter)
212 def user_search_gid(self, gidNumber):
214 Retrieves a list of users with a certain UNIX gid
215 number (search by default group).
217 Returns: the list of uids matched (usernames)
220 # search for posixAccount entries with the specified gidNumber
221 search_filter = '(&(objectClass=posixAccount)(gidNumber=%d))' % gidNumber
222 return self.user_search(search_filter)
225 def user_add(self, uid, cn, uidNumber, gidNumber, homeDirectory, loginShell=None, gecos=None, description=None):
227 Adds a user to the directory.
230 uid - the UNIX username for the account
231 cn - the real name of the member
232 uidNumber - the UNIX user id number
233 gidNumber - the UNIX group id number (default group)
234 homeDirectory - home directory for the user
235 loginShell - login shell for the user
236 gecos - comment field (usually stores name etc)
237 description - description field (optional and unimportant)
239 Example: connection.user_add('mspang', 'Michael Spang',
240 21292, 100, '/users/mspang', '/bin/bash',
244 if not self.connected(): raise LDAPException("Not connected!")
246 dn = 'uid=' + uid + ',' + self.user_base
248 'objectClass': [ 'top', 'account', 'posixAccount', 'shadowAccount' ],
251 'loginShell': [ loginShell ],
252 'uidNumber': [ str(uidNumber) ],
253 'gidNumber': [ str(gidNumber) ],
254 'homeDirectory': [ homeDirectory ],
259 attrs['loginShell'] = loginShell
261 attrs['description'] = [ description ]
264 modlist = ldap.modlist.addModlist(attrs)
265 self.ldap.add_s(dn, modlist)
266 except ldap.LDAPError, e:
267 raise LDAPException("unable to add: %s" % e)
270 def user_modify(self, uid, attrs):
272 Update user attributes in the directory.
275 uid - username of the user to modify
276 attrs - dictionary as returned by user_lookup() with changes to make.
277 omitted attributes are DELETED.
279 Example: user = user_lookup('mspang')
280 user['uidNumber'] = [ '0' ]
281 connection.user_modify('mspang', user)
284 if not self.connected(): raise LDAPException("Not connected!")
286 # distinguished name of the entry to modify
287 dn = 'uid=' + uid + ',' + self.user_base
289 # retrieve current state of user
290 old_user = self.user_lookup(uid)
294 # build list of modifications to make
295 changes = ldap.modlist.modifyModlist(old_user, attrs)
298 self.ldap.modify_s(dn, changes)
300 except ldap.LDAPError, e:
301 raise LDAPException("unable to modify: %s" % e)
304 def user_delete(self, uid):
306 Removes a user from the directory.
308 Example: connection.user_delete('mspang')
311 if not self.connected(): raise LDAPException("Not connected!")
314 dn = 'uid=' + uid + ',' + self.user_base
315 self.ldap.delete_s(dn)
316 except ldap.LDAPError, e:
317 raise LDAPException("unable to delete: %s" % e)
321 ### Group-related Methods ###
323 def group_lookup(self, cn):
325 Retrieves the attributes of a group.
328 cn - the UNIX group name to lookup
330 Returns: attributes of the group's LDAP entry
332 Example: connection.group_lookup('office') -> {
339 dn = 'cn=' + cn + ',' + self.group_base
340 return self.lookup(dn, 'posixGroup')
343 def group_search_id(self, gidNumber):
345 Retrieves a list of groups with the specified UNIX group number.
347 Returns: a list of groups with gid gidNumber
349 Example: connection.group_search_id(1001) -> ['office']
352 if not self.connected(): raise LDAPException("Not connected!")
354 # search for posixAccount entries with the specified uidNumber
356 search_filter = '(&(objectClass=posixGroup)(gidNumber=%d))' % gidNumber
357 matches = self.ldap.search_s(self.group_base, ldap.SCOPE_SUBTREE, search_filter)
358 except ldap.LDAPError,e :
359 raise LDAPException("group search failed: %s" % e)
361 # list for groups found
364 for match in matches:
365 dn, attributes = match
367 # cn is a required attribute of posixGroup
368 if not attributes.has_key('cn'):
369 raise LDAPException(dn + ' (posixGroup) has no cn')
371 # do not handle the case of multiple cns for one group (yet)
372 elif len(attributes['cn']) > 1:
373 raise LDAPException(dn + ' (posixGroup) has multiple cns')
375 # append the sole uid of this match to the list
376 group_cns.append( attributes['cn'][0] )
381 def group_add(self, cn, gidNumber, description=None):
383 Adds a group to the directory.
385 Example: connection.group_add('office', 1001, 'Office Staff')
388 if not self.connected(): raise LDAPException("Not connected!")
390 dn = 'cn=' + cn + ',' + self.group_base
392 'objectClass': [ 'top', 'posixGroup' ],
394 'gidNumber': [ str(gidNumber) ],
397 attrs['description'] = description
400 modlist = ldap.modlist.addModlist(attrs)
401 self.ldap.add_s(dn, modlist)
402 except ldap.LDAPError, e:
403 raise LDAPException("unable to add group: %s" % e)
406 def group_modify(self, cn, attrs):
408 Update group attributes in the directory.
410 The only available updates are fairly destructive (rename or renumber)
411 but this method is provided for completeness.
414 cn - name of the group to modify
415 entry - dictionary as returned by group_lookup() with changes to make.
416 omitted attributes are DELETED.
418 Example: group = group_lookup('office')
419 group['gidNumber'] = [ str(connection.first_id(20000, 40000)) ]
420 del group['memberUid']
421 connection.group_modify('office', group)
424 if not self.connected(): raise LDAPException("Not connected!")
426 # distinguished name of the entry to modify
427 dn = 'cn=' + cn + ',' + self.group_base
429 # retrieve current state of group
430 old_group = self.group_lookup(cn)
434 # build list of modifications to make
435 changes = ldap.modlist.modifyModlist(old_group, attrs)
438 self.ldap.modify_s(dn, changes)
440 except ldap.LDAPError, e:
441 raise LDAPException("unable to modify: %s" % e)
444 def group_delete(self, cn):
446 Removes a group from the directory."
448 Example: connection.group_delete('office')
451 if not self.connected(): raise LDAPException("Not connected!")
454 dn = 'cn=' + cn + ',' + self.group_base
455 self.ldap.delete_s(dn)
456 except ldap.LDAPError, e:
457 raise LDAPException("unable to delete group: %s" % e)
460 ### Miscellaneous Methods ###
462 def used_uids(self, minimum=None, maximum=None):
464 Compiles a list of used UIDs in a range.
467 minimum - smallest uid to return in the list
468 maximum - largest uid to return in the list
470 Returns: list of integer uids
472 Example: connection.used_uids(20000, 40000) -> [20000, 20001, ...]
475 if not self.connected(): raise LDAPException("Not connected!")
478 users = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['uidNumber'])
479 except ldap.LDAPError, e:
480 raise LDAPException("search for uids failed: %s" % e)
485 uid = int(attrs['uidNumber'][0])
486 if (not minimum or uid >= minimum) and (not maximum or uid <= maximum):
492 def used_gids(self, minimum=None, maximum=None):
494 Compiles a list of used GIDs in a range.
497 minimum - smallest gid to return in the list
498 maximum - largest gid to return in the list
500 Returns: list of integer gids
502 Example: connection.used_gids(20000, 40000) -> [20000, 20001, ...]
505 if not self.connected(): raise LDAPException("Not connected!")
508 users = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['gidNumber'])
509 except ldap.LDAPError, e:
510 raise LDAPException("search for gids failed: %s" % e)
515 gid = int(attrs['gidNumber'][0])
516 if (not minimum or gid >= minimum) and (not maximum or gid <= maximum):
525 if __name__ == '__main__':
527 from csc.common.test import *
529 conffile = '/etc/csc/ldap.cf'
530 cfg = dict([map(str.strip, a.split("=", 1)) for a in map(str.strip, open(conffile).read().split("\n")) if "=" in a ])
531 srvurl = cfg['server_url'][1:-1]
532 binddn = cfg['admin_bind_dn'][1:-1]
533 bindpw = cfg['admin_bind_pw'][1:-1]
534 ubase = cfg['users_base'][1:-1]
535 gbase = cfg['groups_base'][1:-1]
539 # t=test u=user g=group c=changed r=real e=expected
541 turname = 'Test User'
542 tuhome = '/home/testuser'
543 tushell = '/bin/false'
544 tugecos = 'Test User,,,'
546 cushell = '/bin/true'
547 cuhome = '/home/changed'
548 curname = 'Test Modified User'
551 connection = LDAPConnection()
554 test(LDAPConnection.disconnect)
555 connection.disconnect()
558 test(LDAPConnection.connect)
559 connection.connect(srvurl, binddn, bindpw, ubase, gbase)
560 if not connection.connected():
561 fail("not connected")
565 connection.user_delete(tuname)
566 connection.group_delete(tgname)
567 except LDAPException:
570 test(LDAPConnection.used_uids)
571 uids = connection.used_uids(minid, maxid)
572 if type(uids) is not list:
573 fail("list not returned")
576 test(LDAPConnection.used_gids)
577 gids = connection.used_gids(minid, maxid)
578 if type(gids) is not list:
579 fail("list not returned")
583 for idnum in xrange(minid, maxid):
584 if not idnum in uids and not idnum in gids:
585 unusedids.append(idnum)
587 tuuid = unusedids.pop()
588 tugid = unusedids.pop()
591 'loginShell': [ tushell ],
592 'uidNumber': [ str(tuuid) ],
593 'gidNumber': [ str(tugid) ],
594 'gecos': [ tugecos ],
595 'homeDirectory': [ tuhome ],
599 test(LDAPConnection.user_add)
600 connection.user_add(tuname, turname, tuuid, tugid, tuhome, tushell, tugecos)
603 tggid = unusedids.pop()
606 'gidNumber': [ str(tggid) ]
609 test(LDAPConnection.group_add)
610 connection.group_add(tgname, tggid)
613 test(LDAPConnection.user_lookup)
614 udata = connection.user_lookup(tuname)
615 del udata['objectClass']
616 assert_equal(eudata, udata)
619 test(LDAPConnection.group_lookup)
620 gdata = connection.group_lookup(tgname)
621 del gdata['objectClass']
622 assert_equal(egdata, gdata)
625 test(LDAPConnection.user_search_id)
627 ulist = connection.user_search_id(tuuid)
628 assert_equal(eulist, ulist)
631 test(LDAPConnection.user_search_gid)
632 ulist = connection.user_search_gid(tugid)
633 if tuname not in ulist:
634 fail("(%s) not in (%s)" % (tuname, ulist))
637 ecudata = connection.user_lookup(tuname)
638 ecudata['loginShell'] = [ cushell ]
639 ecudata['homeDirectory'] = [ cuhome ]
640 ecudata['cn'] = [ curname ]
642 test(LDAPConnection.user_modify)
643 connection.user_modify(tuname, ecudata)
644 cudata = connection.user_lookup(tuname)
645 assert_equal(ecudata, cudata)
648 ecgdata = connection.group_lookup(tgname)
649 ecgdata['memberUid'] = [ tuname ]
651 test(LDAPConnection.group_modify)
652 connection.group_modify(tgname, ecgdata)
653 cgdata = connection.group_lookup(tgname)
654 assert_equal(ecgdata, cgdata)
657 test(LDAPConnection.group_delete)
658 connection.group_delete(tgname)
661 test(LDAPConnection.disconnect)
662 connection.disconnect()