New release (version 0.2).

Updates in this version:

  * Tests added to most Python modules.
  * Split configuration files.
  * Added maintainer scripts to manage permissions during install and purge.
  * Added functions for use by tools planned for next release (chfn, etc).

ceo:

  * Added support for account "repair", which will recreate LDAP entries
    and Kerberos principals if necessary.
  * The recreate account menu option is now active.

Miscellaneous:

  * Replaced instances of "== None" and "!= None" with "is None" and
    "is not None", respectively (thanks to: Nick Guenther).
  * Renamed terms.valid() to terms.validate() (thanks to: Nick Guenther).
This commit is contained in:
Michael Spang 2007-01-27 19:23:18 -05:00
parent cb59e85c2e
commit 58bf72726a
36 changed files with 2365 additions and 709 deletions

16
bin/ceo
View File

@ -1,22 +1,20 @@
#!/usr/bin/python2.4 -- #!/usr/bin/python2.4 --
"""CEO SUID Python Wrapper Script"""
import os, sys import os, sys
safe_environment = ['LOGNAME', 'USERNAME', 'USER', 'HOME', safe_environment = ['LOGNAME', 'USERNAME', 'USER', 'HOME', 'TERM', 'LANG'
'TERM', 'LANG', 'LC_ALL', 'LC_COLLATE', 'LC_ALL', 'LC_COLLATE', 'LC_CTYPE', 'LC_MESSAGE', 'LC_MONETARY',
'LC_CTYPE', 'LC_MESSAGE', 'LC_MONETARY', 'LC_NUMERIC', 'LC_TIME', 'UID', 'GID', 'SSH_CONNECTION', 'SSH_AUTH_SOCK',
'LC_NUMERIC', 'LC_TIME', 'UID', 'GID', 'SSH_CLIENT']
'SSH_CONNECTION', 'SSH_AUTH_SOCK',
'SSH_CLIENT']
for key in os.environ.keys(): for key in os.environ.keys():
if not key in safe_environment: if key not in safe_environment:
del os.environ[key] del os.environ[key]
os.environ['PATH'] = '/bin:/usr/bin' os.environ['PATH'] = '/bin:/usr/bin'
for dir in sys.path[:]: for dir in sys.path[:]:
if not dir.find('/usr') == 0 or dir.find('/usr/local') == 0: if not dir.find('/usr') == 0:
while dir in sys.path: while dir in sys.path:
sys.path.remove(dir) sys.path.remove(dir)

15
debian/changelog vendored
View File

@ -1,3 +1,18 @@
csc (0.2) unstable; urgency=low
* Tests added to most Python modules.
* Split configuration files.
* Added maintainer scripts to manage permissions during install and purge.
* Added functions for use by tools planned for next release (chfn, etc).
* Added support for account "repair", which will recreate LDAP entries
and principals if necessary.
* The recreate account menu option in CEO is now active.
* Replaced instances of "== None" and "!= None" with "is None" and
"is not None", respectively (thanks to: Nick Guenther).
* Renamed terms.valid() to terms.validate() (thanks to: Nick Guenther).
-- Michael Spang <mspang@uwaterloo.ca> Fri, 26 Jan 2007 20:10:14 -0500
csc (0.1) unstable; urgency=low csc (0.1) unstable; urgency=low
* Initial Release. * Initial Release.

4
debian/control vendored
View File

@ -3,11 +3,11 @@ Section: admin
Priority: optional Priority: optional
Maintainer: Michael Spang <mspang@uwaterloo.ca> Maintainer: Michael Spang <mspang@uwaterloo.ca>
Build-Depends: debhelper (>= 4.0.0) Build-Depends: debhelper (>= 4.0.0)
Standards-Version: 3.6.1 Standards-Version: 3.7.2
Package: csc Package: csc
Architecture: any Architecture: any
Depends: python, python2.4, python2.4-ldap, python2.4-pygresql, krb5-user, less Depends: python, python2.4, python2.4-ldap, python2.4-pygresql, krb5-user, less, ${shlibs:Depends}
Description: Computer Science Club Administrative Utilities Description: Computer Science Club Administrative Utilities
This package contains the CSC Electronic Office This package contains the CSC Electronic Office
and other Computer Science Club administrative and other Computer Science Club administrative

4
debian/copyright vendored
View File

@ -1,7 +1,7 @@
This package was debianized by mspang <mspang@uwaterloo.ca> on This package was debianized by Michael Spang <mspang@uwaterloo.ca> on
Thu, 28 Dec 2006 04:07:03 -0500. Thu, 28 Dec 2006 04:07:03 -0500.
Copyright (c) 2006, 2007 Michael Spang Copyright (c) 2006-2007, Michael Spang
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without Redistribution and use in source and binary forms, with or without

58
debian/postinst vendored Normal file
View File

@ -0,0 +1,58 @@
#!/bin/bash -e
case "$1" in
configure|upgrade)
if getent passwd ceo > /dev/null; then
CEO=ceo
SUID=4750
else
CEO=root
SUID=755
fi
if getent group office > /dev/null; then
OFFICE=office
else
OFFICE=root
fi
if ! dpkg-statoverride --list /usr/bin/ceo > /dev/null; then
dpkg-statoverride --add --update $CEO $OFFICE $SUID /usr/bin/ceo
fi
if [ -f /etc/csc/ldap.cf ] && ! dpkg-statoverride --list /etc/csc/ldap.cf > /dev/null; then
dpkg-statoverride --add --update $CEO staff 640 /etc/csc/ldap.cf
fi
if [ ! -e /etc/csc/ceo.keytab ] && [ -x /usr/sbin/kadmin.local ]; then
if dpkg-statoverride --list /etc/csc/ceo.keytab > /dev/null; then
dpkg-statoverride --remove /etc/csc/ceo.keytab || true
fi
echo 'warning: re-creating ceo.keytab'
echo 'ktadd -k /etc/csc/ceo.keytab ceo/admin' | /usr/sbin/kadmin.local || true
if [ -e /etc/csc/ceo.keytab ]; then
echo -e "\nSuccess!"
else
echo -e "\nFailed!"
fi
fi
if [ -f /etc/csc/ceo.keytab ] && ! dpkg-statoverride --list /etc/csc/ceo.keytab > /dev/null; then
dpkg-statoverride --add --update $CEO staff 640 /etc/csc/ceo.keytab
fi
;;
abort-upgrade|abort-remove|abort-deconfigure)
;;
*)
echo "postinst called with unknown argument \"$1\"" >&2
exit 1
;;
esac
#DEBHELPER#
exit 0

33
debian/postrm vendored Normal file
View File

@ -0,0 +1,33 @@
#!/bin/bash -e
case "$1" in
purge)
if dpkg-statoverride --list /usr/bin/ceo > /dev/null; then
dpkg-statoverride --remove /usr/bin/ceo || true
fi
if dpkg-statoverride --list /etc/csc/ldap.cf > /dev/null; then
dpkg-statoverride --remove /etc/csc/ldap.cf || true
fi
if dpkg-statoverride --list /etc/csc/ceo.keytab > /dev/null; then
dpkg-statoverride --remove /etc/csc/ceo.keytab || true
fi
rmdir --ignore-fail-on-non-empty /etc/csc
;;
remove|failed-upgrade|upgrade)
;;
*)
echo "postrm called with invalid argument \"$1\"" >&2
exit 1
;;
esac
#DEBHELPER#
exit 0

30
debian/rules vendored
View File

@ -2,13 +2,11 @@
PYTHON := python2.4 PYTHON := python2.4
configure:
build: build-stamp build: build-stamp
build-stamp: build-stamp:
mkdir build mkdir build
$(CC) -DFULL_PATH=\"/usr/lib/csc/ceo\" -o build/ceo misc/setuid-prog.c $(CC) -DFULL_PATH='"/usr/lib/csc/ceo"' -o build/ceo misc/setuid-prog.c
touch build-stamp touch build-stamp
clean: clean:
@ -17,27 +15,22 @@ clean:
dh_clean dh_clean
rm -f build-stamp rm -f build-stamp
rm -rf build/ rm -rf build/
find pylib/ -name '*.pyc' -print0 | xargs -0 rm -f find pylib/ -name "*.pyc" -print0 | xargs -0 rm -f
install: build install: build
dh_testdir dh_testdir
dh_testroot dh_testroot
dh_clean -k
# configuration files will contain sensitive information
chmod 600 etc/*
dh_installdirs etc/csc usr/lib/$(PYTHON)/site-packages usr/share/csc \ dh_installdirs etc/csc usr/lib/$(PYTHON)/site-packages usr/share/csc \
usr/lib/csc usr/bin usr/lib/csc usr/bin
dh_install -X.svn -X.pyc pylib/csc usr/lib/$(PYTHON)/site-packages/ dh_install pylib/* usr/lib/$(PYTHON)/site-packages/
dh_install -X.svn -X.pyc etc/* etc/csc/ dh_install etc/* etc/csc/
dh_install -X.svn -X.pyc sql/* usr/share/csc/ dh_install sql/* usr/share/csc/
dh_install -X.svn -X.pyc bin/ceo usr/lib/csc/ dh_install bin/ceo usr/lib/csc/
dh_install -X.svn -X.pyc build/ceo usr/bin/ dh_install build/ceo usr/bin/
binary-arch: build install
binary-indep: build install
dh_testdir dh_testdir
dh_testroot dh_testroot
dh_installchangelogs dh_installchangelogs
@ -60,7 +53,8 @@ binary-indep: build install
dh_md5sums dh_md5sums
dh_builddeb dh_builddeb
binary: binary-indep binary-arch binary-indep:
.PHONY: build clean binary-indep binary-arch binary install configure
binary-arch: build install binary: binary-indep binary-arch
.PHONY: build clean binary-indep binary-arch binary install

View File

@ -3,6 +3,6 @@ Bugs and Caveats
================ ================
CEO: CEO:
- curses does not draw borders/lines correctly in a screen session - curses does not draw borders/lines correctly in a screen session. screen apparently ignores
- windows don't always clear properly some font-changing characters. workaround should be possible (other progs work).
- the menu is not redrawn between windows and therefore a gap may grow there - the menu is not redrawn between windows and therefore a gap tends to grow there

8
docs/TODO Normal file
View File

@ -0,0 +1,8 @@
TODO:
* Python bindings for libkadm5
* Python bindings for quota?
* New UI: urwid-based?
* Logging via syslog
* Try to recover and roll-back on error during account creation
* Write manpages

View File

@ -1,35 +1,44 @@
# $Id: accounts.cf 45 2007-01-02 01:39:10Z mspang $ # /etc/csc/accounts.cf: CSC Accounts Configuration
# CSC Accounts Configuration
### Account Options ### include /etc/csc/ldap.cf
include /etc/csc/kerberos.cf
minimum_id = 20000 ### Member Account Options ###
maximum_id = 40000
shell = "/bin/bash" member_min_id = 20000
home = "/users" member_max_id = 39999
gid = 100 member_shell = "/bin/bash"
member_home = "/users"
member_desc = "CSC Member Account"
member_group = "users"
### Club Account Options ###
### LDAP Configuration ### club_min_id = 15000
club_max_id = 19999
club_shell = "/bin/bash"
club_home = "/users"
club_desc = "CSC Club Account"
club_group = "users"
server_url = "ldap:///" ### Administrative Account Options
users_base = "ou=People,dc=csclub,dc=uwaterloo,dc=ca" admin_min_id = 10000
groups_base = "ou=Group,dc=csclub,dc=uwaterloo,dc=ca" admin_max_id = 14999
admin_shell = "/bin/bash"
admin_home = "/users"
admin_desc = "CSC Administrative Account"
admin_group = "users"
bind_dn = "cn=ceo,dc=csclub,dc=uwaterloo,dc=ca" ### Account Group Options ###
bind_password = "secret"
### Kerberos Configuration ###
realm = "CSCLUB.UWATERLOO.CA"
principal = "ceo/admin@CSCLUB.UWATERLOO.CA"
keytab = "/etc/csc/ceo.keytab"
group_min_id = 10000
group_max_id = 14999
group_desc = "CSC Group"
### Validation Tuning ### ### Validation Tuning ###
username_regex = "^[a-z][-a-z0-9]*$" username_regex = "^[a-z][-a-z0-9]*$"
realname_regex = "^[^,:=]*$" groupname_regex = "^[a-z][-a-z0-9]*$"
min_password_length = 4
shells_file = "/etc/shells"

5
etc/kerberos.cf Normal file
View File

@ -0,0 +1,5 @@
# /etc/csc/kerberos.cf: CSC Kerberos Administration Configuration
realm = "CSCLUB.UWATERLOO.CA"
admin_principal = "ceo/admin@CSCLUB.UWATERLOO.CA"
admin_keytab = "/etc/csc/ceo.keytab"

9
etc/ldap.cf Normal file
View File

@ -0,0 +1,9 @@
# /etc/csc/ldap.cf: CSC LDAP Configuration
server_url = "ldaps:///"
users_base = "ou=People,dc=csclub,dc=uwaterloo,dc=ca"
groups_base = "ou=Group,dc=csclub,dc=uwaterloo,dc=ca"
admin_bind_dn = "cn=ceo,dc=csclub,dc=uwaterloo,dc=ca"
admin_bind_pw = "secret"

View File

@ -1,13 +1,6 @@
# $Id: members.cf 45 2007-01-02 01:39:10Z mspang $ # /etc/csc/members.cf: CSC Members Configuration
# CSC Members Configuration
### Database Configuration ### include /etc/csc/pgsql.cf
server = "localhost"
database = "ceo"
user = "ceo"
password = "secret"
### Validation Tuning ### ### Validation Tuning ###

11
etc/pgsql.cf Normal file
View File

@ -0,0 +1,11 @@
# /etc/csc/pgsql.cf: PostgreSQL database configuration
### Database Configuration ###
# server = "localhost"
server = ""
database = "ceo"
# not used
user = "ceo"
password = "secret"

View File

@ -1,19 +1,5 @@
# $Id: __init__.py 24 2006-12-18 20:23:12Z mspang $
""" """
PyCSC - CSC Administrative Utilities Computer Science Club Python Modules
Member Management:
ceo - legacy ceo interface
Account Management:
ceo - legacy ceo interface
Modules:
admin - administrative code (member and account management)
backend - backend interface code
ui - user interface code
The csc module is a container for all CSC-specific Python modules.
""" """

View File

@ -0,0 +1,9 @@
"""
CSC Administrative Modules
This module provides member and account management modules.
members - member registration management functions
accounts - account administration functions
terms - helper routines for manipulating terms
"""

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,3 @@
# $Id: members.py 44 2006-12-31 07:09:27Z mspang $
""" """
CSC Member Management CSC Member Management
@ -10,44 +9,29 @@ Transactions are used in each method that modifies the database.
Future changes to the members database that need to be atomic Future changes to the members database that need to be atomic
must also be moved into this module. must also be moved into this module.
""" """
import re import re
from csc.adm import terms from csc.adm import terms
from csc.backends import db from csc.backends import db
from csc.common.conf import read_config from csc.common import conf
### Configuration ###
### Configuration
CONFIG_FILE = '/etc/csc/members.cf' CONFIG_FILE = '/etc/csc/members.cf'
cfg = {} cfg = {}
def load_configuration(): def load_configuration():
"""Load Members Configuration""" """Load Members Configuration"""
# configuration already loaded? string_fields = [ 'studentid_regex', 'realname_regex', 'server',
if len(cfg) > 0: 'database', 'user', 'password' ]
return
# read in the file # read configuration file
cfg_tmp = read_config(CONFIG_FILE) cfg_tmp = conf.read(CONFIG_FILE)
if not cfg_tmp: # verify configuration
raise MemberException("unable to read configuration file: %s" conf.check_string_fields(CONFIG_FILE, string_fields, cfg_tmp)
% CONFIG_FILE)
# check that essential fields are completed
mandatory_fields = [ 'server', 'database', 'user', 'password' ]
for field in mandatory_fields:
if not field in cfg_tmp:
raise MemberException("missing configuratino option: %s" % field)
if not cfg_tmp[field]:
raise MemberException("null configuration option: %s" %field)
# update the current configuration with the loaded values # update the current configuration with the loaded values
cfg.update(cfg_tmp) cfg.update(cfg_tmp)
@ -56,24 +40,46 @@ def load_configuration():
### Exceptions ### ### Exceptions ###
DBException = db.DBException
ConfigurationException = conf.ConfigurationException
class MemberException(Exception): class MemberException(Exception):
"""Exception class for member-related errors.""" """Base exception class for member-related errors."""
class DuplicateStudentID(MemberException): class DuplicateStudentID(MemberException):
"""Exception class for student ID conflicts.""" """Exception class for student ID conflicts."""
pass def __init__(self, studentid):
self.studentid = studentid
def __str__(self):
return "Student ID already exists in the database: %s" % self.studentid
class InvalidStudentID(MemberException): class InvalidStudentID(MemberException):
"""Exception class for malformed student IDs.""" """Exception class for malformed student IDs."""
pass def __init__(self, studentid):
self.studentid = studentid
def __str__(self):
return "Student ID is invalid: %s" % self.studentid
class InvalidTerm(MemberException): class InvalidTerm(MemberException):
"""Exception class for malformed terms.""" """Exception class for malformed terms."""
pass def __init__(self, term):
self.term = term
def __str__(self):
return "Term is invalid: %s" % self.term
class InvalidRealName(MemberException):
"""Exception class for invalid real names."""
def __init__(self, name):
self.name = name
def __str__(self):
return "Name is invalid: %s" % self.name
class NoSuchMember(MemberException): class NoSuchMember(MemberException):
"""Exception class for nonexistent members.""" """Exception class for nonexistent members."""
pass def __init__(self, memberid):
self.memberid = memberid
def __str__(self):
return "Member not found: %d" % self.memberid
@ -82,12 +88,10 @@ class NoSuchMember(MemberException):
# global database connection # global database connection
connection = db.DBConnection() connection = db.DBConnection()
def connect(): def connect():
"""Connect to PostgreSQL.""" """Connect to PostgreSQL."""
load_configuration() load_configuration()
connection.connect(cfg['server'], cfg['database']) connection.connect(cfg['server'], cfg['database'])
@ -103,24 +107,27 @@ def connected():
return connection.connected() return connection.connected()
### Member Table ### ### Member Table ###
def new(realname, studentid=None, program=None): def new(realname, studentid=None, program=None, mtype='user', userid=None):
""" """
Registers a new CSC member. The member is added Registers a new CSC member. The member is added to the members table
to the members table and registered for the current and registered for the current term.
term.
Parameters: Parameters:
realname - the full real name of the member realname - the full real name of the member
studentid - the student id number of the member studentid - the student id number of the member
program - the program of study of the member program - the program of study of the member
mtype - a string describing the type of member ('user', 'club')
userid - the initial user id
Returns: the memberid of the new member Returns: the memberid of the new member
Exceptions: Exceptions:
DuplicateStudentID - if the student id already exists in the database DuplicateStudentID - if the student id already exists in the database
InvalidStudentID - if the student id is malformed InvalidStudentID - if the student id is malformed
InvalidRealName - if the real name is malformed
Example: new("Michael Spang", program="CS") -> 3349 Example: new("Michael Spang", program="CS") -> 3349
""" """
@ -128,16 +135,21 @@ def new(realname, studentid=None, program=None):
# blank attributes should be NULL # blank attributes should be NULL
if studentid == '': studentid = None if studentid == '': studentid = None
if program == '': program = None if program == '': program = None
if userid == '': userid = None
if mtype == '': mtype = None
# check the student id format # check the student id format
regex = '^[0-9]{8}$' if studentid is not None and not re.match(cfg['studentid_regex'], str(studentid)):
if studentid != None and not re.match(regex, str(studentid)): raise InvalidStudentID(studentid)
raise InvalidStudentID("student id is invalid: %s" % studentid)
# check real name format (UNIX account real names must not contain [,:=])
if not re.match(cfg['realname_regex'], realname):
raise InvalidRealName(realname)
# check for duplicate student id # check for duplicate student id
member = connection.select_member_by_studentid(studentid) member = connection.select_member_by_studentid(studentid)
if member: if member:
raise DuplicateStudentID("student id exists in database: %s" % studentid) raise DuplicateStudentID(studentid)
# add the member # add the member
memberid = connection.insert_member(realname, studentid, program) memberid = connection.insert_member(realname, studentid, program)
@ -155,9 +167,6 @@ def get(memberid):
""" """
Look up attributes of a member by memberid. Look up attributes of a member by memberid.
Parameters:
memberid - the member id number
Returns: a dictionary of attributes Returns: a dictionary of attributes
Example: get(3349) -> { Example: get(3349) -> {
@ -188,7 +197,7 @@ def get_userid(userid):
} }
""" """
return connection.select_member_by_account(userid) return connection.select_member_by_userid(userid)
def get_studentid(studentid): def get_studentid(studentid):
@ -265,20 +274,23 @@ def delete(memberid):
""" """
Erase all records of a member. Erase all records of a member.
Note: real members are never removed Note: real members are never removed from the database
from the database
Parameters: Returns: attributes and terms of the member in a tuple
memberid - the member id number
Returns: attributes and terms of the Exceptions:
member in a tuple NoSuchMember - if the member id does not exist
Example: delete(0) -> ({ 'memberid': 0, name: 'Calum T. Dalek' ...}, ['s1993']) Example: delete(0) -> ({ 'memberid': 0, name: 'Calum T. Dalek' ...}, ['s1993'])
""" """
# save member data # save member data
member = connection.select_member_by_id(memberid) member = connection.select_member_by_id(memberid)
# bail if not found
if not member:
raise NoSuchMember(memberid)
term_list = connection.select_terms(memberid) term_list = connection.select_terms(memberid)
# remove data from the db # remove data from the db
@ -291,13 +303,12 @@ def delete(memberid):
def update(member): def update(member):
""" """
Update CSC member attributes. None is NULL. Update CSC member attributes.
Parameters: Parameters:
member - a dictionary with member attributes as member - a dictionary with member attributes as returned by get,
returned by get, possibly omitting some possibly omitting some attributes. member['memberid']
attributes. member['memberid'] must exist must exist and be valid. None is NULL.
and be valid.
Exceptions: Exceptions:
NoSuchMember - if the member id does not exist NoSuchMember - if the member id does not exist
@ -307,20 +318,18 @@ def update(member):
Example: update( {'memberid': 3349, userid: 'mspang'} ) Example: update( {'memberid': 3349, userid: 'mspang'} )
""" """
if member.has_key('studentid') and member['studentid'] != None: if member.has_key('studentid') and member['studentid'] is not None:
studentid = member['studentid'] studentid = member['studentid']
# check the student id format # check the student id format
regex = '^[0-9]{8}$' if studentid is not None and not re.match(cfg['studentid_regex'], str(studentid)):
if studentid != None and not re.match(regex, str(studentid)): raise InvalidStudentID(studentid)
raise InvalidStudentID("student id is invalid: %s" % studentid)
# check for duplicate student id # check for duplicate student id
member = connection.select_member_by_studentid(studentid) dupmember = connection.select_member_by_studentid(studentid)
if member: if dupmember:
raise DuplicateStudentID("student id exists in database: %s" % raise DuplicateStudentID(studentid)
studentid)
# not specifying memberid is a bug # not specifying memberid is a bug
if not member.has_key('memberid'): if not member.has_key('memberid'):
@ -328,10 +337,8 @@ def update(member):
memberid = member['memberid'] memberid = member['memberid']
# see if member exists # see if member exists
old_member = connection.select_member_by_id(memberid) if not get(memberid):
if not old_member: raise NoSuchMember(memberid)
raise NoSuchMember("memberid does not exist in database: %d" %
memberid)
# do the update # do the update
connection.update_member(member) connection.update_member(member)
@ -359,14 +366,14 @@ def register(memberid, term_list):
Example: register(3349, ["w2007", "s2007"]) Example: register(3349, ["w2007", "s2007"])
""" """
if not type(term_list) in (list, tuple): if type(term_list) in (str, unicode):
term_list = [ term_list ] term_list = [ term_list ]
for term in term_list: for term in term_list:
# check term syntax # check term syntax
if not re.match('^[wsf][0-9]{4}$', term): if not re.match('^[wsf][0-9]{4}$', term):
raise InvalidTerm("term is invalid: %s" % term) raise InvalidTerm(term)
# add term to database # add term to database
connection.insert_term(memberid, term) connection.insert_term(memberid, term)
@ -388,10 +395,10 @@ def registered(memberid, term):
Example: registered(3349, "f2006") -> True Example: registered(3349, "f2006") -> True
""" """
return connection.select_term(memberid, term) != None return connection.select_term(memberid, term) is not None
def terms_list(memberid): def member_terms(memberid):
""" """
Retrieves a list of terms a member is Retrieves a list of terms a member is
registered for. registered for.
@ -404,7 +411,9 @@ def terms_list(memberid):
Example: registered(0) -> 's1993' Example: registered(0) -> 's1993'
""" """
return connection.select_terms(memberid) terms_list = connection.select_terms(memberid)
terms_list.sort(terms.compare)
return terms_list
@ -412,15 +421,104 @@ def terms_list(memberid):
if __name__ == '__main__': if __name__ == '__main__':
connect() from csc.common.test import *
sid = new("Test User", "99999999", "CS")
assert registered(id, terms.current()) # t=test m=member s=student u=updated
print get(sid) tmname = 'Test Member'
register(sid, terms.next(terms.current())) tmprogram = 'Metaphysics'
assert registered(sid, terms.next(terms.current())) tmsid = '00000000'
print terms_list(sid) tm2name = 'Test Member 2'
print get(sid) tm2sid = '00000001'
print delete(sid) tm2uname = 'Test Member II'
tm2usid = '00000002'
tm2uprogram = 'Pseudoscience'
tm2uuserid = 'testmember'
tmdict = {'name': tmname, 'userid': None, 'program': tmprogram, 'type': 'user', 'studentid': tmsid }
tm2dict = {'name': tm2name, 'userid': None, 'program': None, 'type': 'user', 'studentid': tm2sid }
tm2udict = {'name': tm2uname, 'userid': tm2uuserid, 'program': tm2uprogram, 'type': 'user', 'studentid': tm2usid }
thisterm = terms.current()
nextterm = terms.next(thisterm)
test(connect)
connect()
success()
test(connected)
assert_equal(True, connected())
success()
dmid = get_studentid(tmsid)
if dmid: delete(dmid['memberid'])
dmid = get_studentid(tm2sid)
if dmid: delete(dmid['memberid'])
dmid = get_studentid(tm2usid)
if dmid: delete(dmid['memberid'])
test(new)
tmid = new(tmname, tmsid, tmprogram)
tm2id = new(tm2name, tm2sid)
success()
tmdict['memberid'] = tmid
tm2dict['memberid'] = tm2id
tm2udict['memberid'] = tm2id
test(registered)
assert_equal(True, registered(tmid, thisterm))
assert_equal(True, registered(tm2id, thisterm))
assert_equal(False, registered(tmid, nextterm))
success()
test(get)
assert_equal(tmdict, get(tmid))
assert_equal(tm2dict, get(tm2id))
success()
test(list_name)
assert_equal(True, tmid in [ x['memberid'] for x in list_name(tmname) ])
assert_equal(True, tm2id in [ x['memberid'] for x in list_name(tm2name) ])
success()
test(register)
register(tmid, terms.next(terms.current()))
assert_equal(True, registered(tmid, nextterm))
success()
test(member_terms)
assert_equal([thisterm, nextterm], member_terms(tmid))
assert_equal([thisterm], member_terms(tm2id))
success()
test(list_term)
assert_equal(True, tmid in [ x['memberid'] for x in list_term(thisterm) ])
assert_equal(True, tmid in [ x['memberid'] for x in list_term(nextterm) ])
assert_equal(True, tm2id in [ x['memberid'] for x in list_term(thisterm) ])
assert_equal(False, tm2id in [ x['memberid'] for x in list_term(nextterm) ])
success()
test(update)
update(tm2udict)
assert_equal(tm2udict, get(tm2id))
success()
test(get_userid)
assert_equal(tm2udict, get_userid(tm2uuserid))
success()
test(get_studentid)
assert_equal(tm2udict, get_studentid(tm2usid))
assert_equal(tmdict, get_studentid(tmsid))
success()
test(delete)
delete(tmid)
delete(tm2id)
success()
test(disconnect)
disconnect()
assert_equal(False, connected())
disconnect()
success()

View File

@ -1,11 +1,9 @@
# $Id: terms.py 44 2006-12-31 07:09:27Z mspang $
""" """
Terms Routines Terms Routines
This module contains functions for manipulating This module contains functions for manipulating terms, such as determining
terms, such as determining the current term, the current term, finding the next or previous term, converting dates to
finding the next or previous term, converting terms, and more.
dates to terms, and more.
""" """
import time, datetime, re import time, datetime, re
@ -16,27 +14,27 @@ EPOCH = 1970
SEASONS = [ 'w', 's', 'f' ] SEASONS = [ 'w', 's', 'f' ]
def valid(term): def validate(term):
""" """
Determines whether a term is well-formed: Determines whether a term is well-formed.
Parameters: Parameters:
term - the term string term - the term string
Returns: whether the term is valid (boolean) Returns: whether the term is valid (boolean)
Example: valid("f2006") -> True Example: validate("f2006") -> True
""" """
regex = '^[wsf][0-9]{4}$' regex = '^[wsf][0-9]{4}$'
return re.match(regex, term) != None return re.match(regex, term) is not None
def parse(term): def parse(term):
"""Helper function to convert a term string to the number of terms """Helper function to convert a term string to the number of terms
since the epoch. Such numbers are intended for internal use only.""" since the epoch. Such numbers are intended for internal use only."""
if not valid(term): if not validate(term):
raise Exception("malformed term: %s" % term) raise Exception("malformed term: %s" % term)
year = int( term[1:] ) year = int( term[1:] )
@ -176,8 +174,8 @@ def from_timestamp(timestamp):
This function notes that: This function notes that:
WINTER = JANUARY to APRIL WINTER = JANUARY to APRIL
SPRING = MAY TO AUGUST SPRING = MAY to AUGUST
FALL = SEPTEMBER TO DECEMBER FALL = SEPTEMBER to DECEMBER
Parameters: Parameters:
timestamp - number of seconds since the epoch timestamp - number of seconds since the epoch
@ -235,18 +233,22 @@ def next_unregistered(registered):
if __name__ == '__main__': if __name__ == '__main__':
assert parse('f2006') == 110 from csc.common.test import *
assert generate(110) == 'f2006'
assert next('f2006') == 'w2007'
assert previous('f2006') == 's2006'
assert delta('f2006', 'w2007') == 1
assert add('f2006', delta('f2006', 'w2010')) == 'w2010'
assert interval('f2006', 3) == ['f2006', 'w2007', 's2007']
assert from_timestamp(1166135779) == 'f2006'
assert parse( current() ) >= 110
assert next_unregistered( [current()] ) == next( current() )
assert next_unregistered( [] ) == current()
assert next_unregistered( [previous(current())] ) == current()
assert next_unregistered( [add(current(), -2)] ) == current()
print "All tests passed." "\n" test(parse); assert_equal(110, parse('f2006')); success()
test(generate); assert_equal('f2006', generate(110)); success()
test(next); assert_equal('w2007', next('f2006')); success()
test(previous); assert_equal('s2006', previous('f2006')); success()
test(delta); assert_equal(1, delta('f2006', 'w2007')); success()
test(compare); assert_equal(-1, compare('f2006', 'w2007')); success()
test(add); assert_equal('w2010', add('f2006', delta('f2006', 'w2010'))); success()
test(interval); assert_equal(['f2006', 'w2007', 's2007'], interval('f2006', 3)); success()
test(from_timestamp); assert_equal('f2006', from_timestamp(1166135779)); success()
test(current); assert_equal(True, parse( current() ) >= 110 ); success()
test(next_unregistered)
assert_equal( next(current()), next_unregistered([ current() ]))
assert_equal( current(), next_unregistered([]))
assert_equal( current(), next_unregistered([ previous(current()) ]))
assert_equal( current(), next_unregistered([ add(current(), -2) ]))
success()

View File

@ -1,9 +1,8 @@
# $Id: __init__.py 23 2006-12-18 20:14:51Z mspang $
""" """
User Interfaces Application-style User Interfaces
This module contains frontends and related modules. This module contains large frontends with many functions
CEO's primary frontends are: and fancy graphical user interfaces.
legacy - aims to reproduce the curses UI of the previous CEO legacy - aims to reproduce the curses UI of the previous CEO
""" """

View File

@ -1,10 +1,8 @@
# $Id: __init__.py 23 2006-12-18 20:14:51Z mspang $
""" """
Legacy User Interface Legacy User Interface
This module contains the legacy CEO user interface and related modules. This module contains the legacy CEO user interface and related modules.
Important modules are:
main.py - all of the main UI logic main - all of the main UI logic
helpers.py - user interface library routines helpers - user interface library routines
""" """

View File

@ -1,4 +1,3 @@
# $Id: helpers.py 35 2006-12-28 05:14:05Z mspang $
""" """
Helpers for legacy User Interface Helpers for legacy User Interface
@ -7,7 +6,7 @@ the look and behavior of the previous CEO. Included is code for various
curses-based UI widgets that were provided by Perl 5's Curses and curses-based UI widgets that were provided by Perl 5's Curses and
Curses::Widgets libraries. Curses::Widgets libraries.
Though attempts have been made to keep the UI bug-compatible with Though attempts have been made to keep the UI [bug-]compatible with
the previous system, some compromises have been made. For example, the previous system, some compromises have been made. For example,
the input and textboxes draw 'OK' and 'Cancel' buttons where the old the input and textboxes draw 'OK' and 'Cancel' buttons where the old
CEO had them, but they are fake. That is, the buttons in the old CEO had them, but they are fake. That is, the buttons in the old
@ -52,14 +51,14 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True):
# turn on cursor # turn on cursor
try: try:
curses.curs_set(1) curses.curs_set(1)
except: except curses.error:
pass pass
# set keypad mode to allow UP, DOWN, etc # set keypad mode to allow UP, DOWN, etc
wnd.keypad(1) wnd.keypad(1)
# the input string # the input string
input = "" inputbuf = ""
# offset of cursor in input # offset of cursor in input
# i.e. the next operation is applied at input[inputoff] # i.e. the next operation is applied at input[inputoff]
@ -78,7 +77,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True):
if echo: if echo:
# discard characters before displayoff, # discard characters before displayoff,
# as the window may be scrolled to the right # as the window may be scrolled to the right
substring = input[displayoff:] substring = inputbuf[displayoff:]
# pad the string with zeroes to overwrite stale characters # pad the string with zeroes to overwrite stale characters
substring = substring + " " * (width - len(substring)) substring = substring + " " * (width - len(substring))
@ -96,7 +95,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True):
# enter returns input # enter returns input
if key == KEY_RETURN: if key == KEY_RETURN:
return input return inputbuf
# escape aborts input # escape aborts input
elif key == KEY_ESCAPE: elif key == KEY_ESCAPE:
@ -104,7 +103,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True):
# EOT (C-d) aborts if there is no input # EOT (C-d) aborts if there is no input
elif key == KEY_EOT: elif key == KEY_EOT:
if len(input) == 0: if len(inputbuf) == 0:
return None return None
# backspace removes the previous character # backspace removes the previous character
@ -112,7 +111,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True):
if inputoff > 0: if inputoff > 0:
# remove the character immediately before the input offset # remove the character immediately before the input offset
input = input[0:inputoff-1] + input[inputoff:] inputbuf = inputbuf[0:inputoff-1] + inputbuf[inputoff:]
inputoff -= 1 inputoff -= 1
# move either the cursor or entire line of text left # move either the cursor or entire line of text left
@ -124,7 +123,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True):
if inputoff < len(input): if inputoff < len(input):
# remove the character at the input offset # remove the character at the input offset
input = input[0:inputoff] + input[inputoff+1:] inputbuf = inputbuf[0:inputoff] + inputbuf[inputoff+1:]
# left moves the cursor one character left # left moves the cursor one character left
elif key == curses.KEY_LEFT: elif key == curses.KEY_LEFT:
@ -139,7 +138,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True):
# right moves the cursor one character right # right moves the cursor one character right
elif key == curses.KEY_RIGHT: elif key == curses.KEY_RIGHT:
if inputoff < len(input): if inputoff < len(inputbuf):
# move the cursor to the right # move the cursor to the right
inputoff += 1 inputoff += 1
@ -155,8 +154,8 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True):
# end moves the cursor past the last character # end moves the cursor past the last character
elif key == curses.KEY_END: elif key == curses.KEY_END:
inputoff = len(input) inputoff = len(inputbuf)
displayoff = len(input) - width + 1 displayoff = len(inputbuf) - width + 1
# insert toggles insert/overwrite mode # insert toggles insert/overwrite mode
elif key == curses.KEY_IC: elif key == curses.KEY_IC:
@ -164,15 +163,15 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True):
# other (printable) characters are added to the input string # other (printable) characters are added to the input string
elif curses.ascii.isprint(key): elif curses.ascii.isprint(key):
if len(input) < maxlen or maxlen == 0: if len(inputbuf) < maxlen or maxlen == 0:
# insert mode: insert before current offset # insert mode: insert before current offset
if insert: if insert:
input = input[0:inputoff] + chr(key) + input[inputoff:] inputbuf = inputbuf[0:inputoff] + chr(key) + inputbuf[inputoff:]
# overwrite mode: replace current offset # overwrite mode: replace current offset
else: else:
input = input[0:inputoff] + chr(key) + input[inputoff+1:] inputbuf = inputbuf[0:inputoff] + chr(key) + inputbuf[inputoff+1:]
# increment the input offset # increment the input offset
inputoff += 1 inputoff += 1
@ -218,13 +217,13 @@ def inputbox(wnd, prompt, field_width, echo=True):
# read an input string within the field region of text_wnd # read an input string within the field region of text_wnd
inputy, inputx, inputwidth = 1, 1, textwidth - 2 inputy, inputx, inputwidth = 1, 1, textwidth - 2
input = read_input(text_wnd, inputy, inputx, inputwidth, 0, echo) inputbuf = read_input(text_wnd, inputy, inputx, inputwidth, 0, echo)
# erase the window # erase the window
child_wnd.erase() child_wnd.erase()
child_wnd.refresh() child_wnd.refresh()
return input return inputbuf
def line_wrap(line, width): def line_wrap(line, width):
@ -323,7 +322,7 @@ def msgbox(wnd, msg, title="Message"):
curses.curs_set(0) curses.curs_set(0)
outer_wnd.keypad(1) outer_wnd.keypad(1)
while True: while True:
key = outer_wnd.getch(0,0) key = outer_wnd.getch(0, 0)
if key == KEY_RETURN or key == KEY_ESCAPE: if key == KEY_RETURN or key == KEY_ESCAPE:
break break
@ -379,18 +378,18 @@ def menu(wnd, offy, offx, width, options, _acquire_wnd=None):
wnd.refresh() wnd.refresh()
# read one keypress # read one keypress
input = wnd.getch() keypress = wnd.getch()
# UP moves to the previous option # UP moves to the previous option
if input == curses.KEY_UP and selected > 0: if keypress == curses.KEY_UP and selected > 0:
selected = (selected - 1) selected = (selected - 1)
# DOWN moves to the next option # DOWN moves to the next option
elif input == curses.KEY_DOWN and selected < len(options) - 1: elif keypress == curses.KEY_DOWN and selected < len(options) - 1:
selected = (selected + 1) selected = (selected + 1)
# RETURN runs the callback for the selected option # RETURN runs the callback for the selected option
elif input == KEY_RETURN: elif keypress == KEY_RETURN:
text, callback = options[selected] text, callback = options[selected]
# highlight the selected option # highlight the selected option

View File

@ -1,4 +1,3 @@
# $Id: main.py 44 2006-12-31 07:09:27Z mspang $
""" """
CEO-like Frontend CEO-like Frontend
@ -21,7 +20,7 @@ BORDER_COLOR = curses.COLOR_RED
def action_new_member(wnd): def action_new_member(wnd):
"""Interactively add a new member.""" """Interactively add a new member."""
username, studentid, program = '', None, '' studentid, program = None, ''
# read the name # read the name
prompt = " Name: " prompt = " Name: "
@ -33,7 +32,7 @@ def action_new_member(wnd):
# read the student id # read the student id
prompt = "Student id:" prompt = "Student id:"
while studentid == None or (re.search("[^0-9]", studentid) and not studentid.lower() == 'exit'): while studentid is None or (re.search("[^0-9]", studentid) and not studentid.lower() == 'exit'):
studentid = inputbox(wnd, prompt, 18) studentid = inputbox(wnd, prompt, 18)
# abort if exit is entered # abort if exit is entered
@ -48,7 +47,7 @@ def action_new_member(wnd):
program = inputbox(wnd, prompt, 18) program = inputbox(wnd, prompt, 18)
# abort if exit is entered # abort if exit is entered
if program == None or program.lower() == 'exit': if program is None or program.lower() == 'exit':
return False return False
# connect the members module to its backend if necessary # connect the members module to its backend if necessary
@ -59,14 +58,17 @@ def action_new_member(wnd):
memberid = members.new(realname, studentid, program) memberid = members.new(realname, studentid, program)
msgbox(wnd, "Success! Your memberid is %s. You are now registered\n" msgbox(wnd, "Success! Your memberid is %s. You are now registered\n"
% memberid + "for the " + terms.current() + " term."); % memberid + "for the " + terms.current() + " term.")
except members.InvalidStudentID: except members.InvalidStudentID:
msgbox(wnd, "Invalid student ID.") msgbox(wnd, "Invalid student ID: %s" % studentid)
return False return False
except members.DuplicateStudentID: except members.DuplicateStudentID:
msgbox(wnd, "A member with this student ID exists.") msgbox(wnd, "A member with this student ID exists.")
return False return False
except members.InvalidRealName:
msgbox(wnd, 'Invalid real name: "%s"' % realname)
return False
def action_term_register(wnd): def action_term_register(wnd):
@ -85,7 +87,7 @@ def action_term_register(wnd):
if not member: return False if not member: return False
memberid = member['memberid'] memberid = member['memberid']
term_list = members.terms_list(memberid) term_list = members.member_terms(memberid)
# display user # display user
display_member_details(wnd, member, term_list) display_member_details(wnd, member, term_list)
@ -134,7 +136,7 @@ def action_term_register_multiple(wnd):
if not member: return False if not member: return False
memberid = member['memberid'] memberid = member['memberid']
term_list = members.terms_list(memberid) term_list = members.member_terms(memberid)
# display user # display user
display_member_details(wnd, member, term_list) display_member_details(wnd, member, term_list)
@ -177,11 +179,57 @@ def action_term_register_multiple(wnd):
msgbox(wnd, "Your are now registered for terms: " + ", ".join(term_list)) msgbox(wnd, "Your are now registered for terms: " + ", ".join(term_list))
except members.InvalidTerm: except members.InvalidTerm:
msgbox(wnd, "Term is not valid: %s" % term) msgbox(wnd, "Invalid term entered.")
return False return False
def repair_account(wnd, memberid, userid):
"""Attemps to repair an account."""
if not accounts.connected(): accounts.connect()
member = members.get(memberid)
exists, haspw = accounts.status(userid)
if not exists:
password = input_password(wnd)
accounts.create_member(userid, password, member['name'], memberid)
msgbox(wnd, "Account created (where the hell did it go, anyway?)\n"
"If you're homedir still exists, it will not be inaccessible to you,\n"
"please contact systems-committee@csclub.uwaterloo.ca to get this resolved.\n")
elif not haspw:
password = input_password(wnd)
accounts.add_password(userid, password)
msgbox(wnd, "Password added to account.")
else:
msgbox(wnd, "No problems to repair.")
def input_password(wnd):
# password input loop
password = "password"
check = "check"
while password != check:
# read password
prompt = "User password:"
password = None
while not password:
password = inputbox(wnd, prompt, 18, False)
# read another password
prompt = "Enter the password again:"
check = None
while not check:
check = inputbox(wnd, prompt, 27, False)
return password
def action_create_account(wnd): def action_create_account(wnd):
"""Interactively create an account for a member.""" """Interactively create an account for a member."""
@ -198,7 +246,7 @@ def action_create_account(wnd):
if not member: return False if not member: return False
memberid = member['memberid'] memberid = member['memberid']
term_list = members.terms_list(memberid) term_list = members.member_terms(memberid)
# display the member # display the member
display_member_details(wnd, member, term_list) display_member_details(wnd, member, term_list)
@ -218,67 +266,59 @@ def action_create_account(wnd):
msgbox(wnd, "I suggest searching for the member by userid or name from the main menu.") msgbox(wnd, "I suggest searching for the member by userid or name from the main menu.")
return False return False
# member already has an account?
if member['userid']:
userid = member['userid']
msgbox(wnd, "Member " + str(memberid) + " already has an account: " + member['userid'] + "\n"
"Attempting to repair it. Contact the sysadmin if there are still problems." )
repair_account(wnd, memberid, userid)
return False
# read user id # read user id
prompt = "Userid:" prompt = "Userid:"
while userid == '': while userid == '':
userid = inputbox(wnd, prompt, 18) userid = inputbox(wnd, prompt, 18)
# user abort # user abort
if userid == None or userid.lower() == 'exit': if userid is None or userid.lower() == 'exit':
return False return False
# member already has an account? # read password
#if member['userid'] != None: password = input_password(wnd)
# msgbox(wnd, "Member " + str(memberid) + " already has an account: " + member['userid'] + "\n"
# "Contact the sysadmin if there are still problems." )
# return False
# password input loop
password = "password"
check = "check"
while password != check:
# read password
prompt = "User password:"
password = None
while not password:
password = inputbox(wnd, prompt, 18, False)
# read another password
prompt = "Enter the password again:"
check = None
while not check:
check = inputbox(wnd, prompt, 27, False)
# create the UNIX account # create the UNIX account
result = accounts.create_account(userid, password, member['name'], memberid) try:
if not accounts.connected(): accounts.connect()
if result == accounts.LDAP_EXISTS: accounts.create_member(userid, password, member['name'], memberid)
msgbox(wnd, "Error: Could not do stuff , Already exists.") except accounts.AccountExists, e:
msgbox(wnd, str(e))
return False return False
elif result == accounts.KRB_EXISTS: except accounts.NoAvailableIDs, e:
msgbox(wnd, "This account already exists in Kerberos, but not in LDAP. Please contact the Systems Administrator.") msgbox(wnd, str(e))
return False return False
elif result == accounts.LDAP_NO_IDS: except accounts.InvalidArgument, e:
msgbox(wnd, "There are no available UNIX user ids. This is a fatal error. Contact the Systems Administrator.") msgbox(wnd, str(e))
return False return False
elif result == accounts.BAD_REALNAME: except accounts.LDAPException, e:
msgbox(wnd, "Invalid real name: %s. Contact the Systems Administrator." % member['name']) msgbox(wnd, "Error creating LDAP entry - Contact the Systems Administrator: %s" % e)
return False return False
elif result == accounts.BAD_USERNAME: except accounts.KrbException, e:
msgbox(wnd, "Invalid username: %s. Enter a valid username." % userid) msgbox(wnd, "Error creating Kerberos principal - Contact the Systems Administrator: %s" % e)
return False return False
elif result != accounts.SUCCESS:
raise Exception("Unexpected return status of accounts.create_account(): %s" % result)
# now update the CEO database with the username # now update the CEO database with the username
members.update( {'memberid':memberid, 'userid': userid} ) members.update( {'memberid': memberid, 'userid': userid} )
# success # success
msgbox(wnd, "Please run 'addhomedir " + userid + "'.") msgbox(wnd, "Please run 'addhomedir " + userid + "'.")
msgbox(wnd, "Success! Your account has been added") msgbox(wnd, "Success! Your account has been added")
return False
def display_member_details(wnd, member, term_list): def display_member_details(wnd, member, term_list):
"""Display member attributes in a message box.""" """Display member attributes in a message box."""
@ -343,17 +383,21 @@ def action_display_member(wnd):
return False return False
member = get_member_memberid_userid(wnd, memberid) member = get_member_memberid_userid(wnd, memberid)
if not member: return if not member: return False
term_list = members.terms_list( member['memberid'] ) term_list = members.member_terms( member['memberid'] )
# display the details in a window # display the details in a window
display_member_details(wnd, member, term_list) display_member_details(wnd, member, term_list)
return False
def page(text): def page(text):
"""Send a text buffer to an external pager for display."""
try: try:
pipe = os.popen('/usr/bin/less', 'w') pager = '/usr/bin/less'
pipe = os.popen(pager, 'w')
pipe.write(text) pipe.write(text)
pipe.close() pipe.close()
except IOError: except IOError:
@ -385,7 +429,7 @@ def action_list_term(wnd):
# read the term # read the term
prompt = "Which term to list members for ([fws]20nn): " prompt = "Which term to list members for ([fws]20nn): "
while term == None or (not term == '' and not re.match('^[wsf][0-9]{4}$', term) and not term == 'exit'): while term is None or (not term == '' and not re.match('^[wsf][0-9]{4}$', term) and not term == 'exit'):
term = inputbox(wnd, prompt, 41) term = inputbox(wnd, prompt, 41)
# abort when exit is entered # abort when exit is entered
@ -404,8 +448,11 @@ def action_list_term(wnd):
# display the mass of text with a pager # display the mass of text with a pager
page( buf ) page( buf )
return False
def action_list_name(wnd): def action_list_name(wnd):
"""Interactively search for members by name."""
name = None name = None
@ -420,7 +467,7 @@ def action_list_name(wnd):
# connect the members module to its backends if necessary # connect the members module to its backends if necessary
if not members.connected(): members.connect() if not members.connected(): members.connect()
# retrieve a list of members for term # retrieve a list of members with similar names
member_list = members.list_name(name) member_list = members.list_name(name)
# format the data into a mess of text # format the data into a mess of text
@ -429,8 +476,11 @@ def action_list_name(wnd):
# display the mass of text with a pager # display the mass of text with a pager
page( buf ) page( buf )
return False
def action_list_studentid(wnd): def action_list_studentid(wnd):
"""Interactively search for members by student id."""
studentid = None studentid = None
@ -458,6 +508,8 @@ def action_list_studentid(wnd):
# display the mass of text with a pager # display the mass of text with a pager
page( buf ) page( buf )
return False
def null_callback(wnd): def null_callback(wnd):
"""Callback for unimplemented menu options.""" """Callback for unimplemented menu options."""
@ -479,7 +531,7 @@ top_menu = [
( "Search for a member by name", action_list_name ), ( "Search for a member by name", action_list_name ),
( "Search for a member by student id", action_list_studentid ), ( "Search for a member by student id", action_list_studentid ),
( "Create an account", action_create_account ), ( "Create an account", action_create_account ),
( "Re Create an account", null_callback ), ( "Re Create an account", action_create_account ),
( "Library functions", null_callback ), ( "Library functions", null_callback ),
( "Exit", exit_callback ), ( "Exit", exit_callback ),
] ]
@ -490,11 +542,10 @@ def acquire_ceo_wnd(screen=None):
# hack to get a reference to the entire screen # hack to get a reference to the entire screen
# even when the caller doesn't (shouldn't) have one # even when the caller doesn't (shouldn't) have one
global _screen if screen is None:
if screen == None: screen = globals()['screen']
screen = _screen
else: else:
_screen = screen globals()['screen'] = screen
# if the screen changes size, a mess may be left # if the screen changes size, a mess may be left
screen.erase() screen.erase()
@ -526,13 +577,21 @@ def ceo_main_curses(screen):
# create ceo window # create ceo window
ceo_wnd, menu_y, menu_x, menu_height, menu_width = acquire_ceo_wnd(screen) ceo_wnd, menu_y, menu_x, menu_height, menu_width = acquire_ceo_wnd(screen)
# display the top level menu try:
menu(ceo_wnd, menu_y, menu_x, menu_width, top_menu, acquire_ceo_wnd) # display the top level menu
menu(ceo_wnd, menu_y, menu_x, menu_width, top_menu, acquire_ceo_wnd)
finally:
members.disconnect()
accounts.disconnect()
def run(): def run():
"""Main function for legacy UI.""" """Main function for legacy UI."""
# workaround for xterm-color (bad terminfo? - curs_set(0) fails)
if "TERM" in os.environ and os.environ['TERM'] == "xterm-color":
os.environ['TERM'] = "xterm"
# wrap the entire program using curses.wrapper # wrap the entire program using curses.wrapper
# so that the terminal is restored to a sane state # so that the terminal is restored to a sane state
# when the program exits # when the program exits
@ -541,7 +600,7 @@ def run():
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
except curses.error: except curses.error:
print "Your screen is too small!" print "Is your screen too small?"
raise raise
except: except:
reset() reset()

View File

@ -1,12 +1,9 @@
# $Id$
""" """
Backends Backend Modules
This module contains backend interfaces and related modules. This module contains backend interfaces and related modules.
CEO's primary backends are:
db.py - CEO's database for member and term registrations db - CEO database interface for member registrations
ldapi.py - LDAP, for UNIX account metadata administration ldapi - LDAP interface for UNIX account attribute administration
krb.py - Kerberos, for UNIX account password administration krb - Kerberos interface for UNIX account password management
""" """

View File

@ -1,4 +1,3 @@
# $Id: db.py 37 2006-12-28 10:00:50Z mspang $
""" """
Database Backend Interface Database Backend Interface
@ -7,7 +6,7 @@ Methods on the connection class correspond in a straightforward way to SQL
queries. These methods may restructure and clean up query output but may make queries. These methods may restructure and clean up query output but may make
no other assumptions about its content or purpose. no other assumptions about its content or purpose.
This module makes use of the PygreSQL Python bindings to libpq, This module makes use of the PyGreSQL Python bindings to libpq,
PostgreSQL's native C client library. PostgreSQL's native C client library.
""" """
import pgdb import pgdb
@ -20,7 +19,7 @@ class DBException(Exception):
class DBConnection(object): class DBConnection(object):
""" """
Connection to CEO's backend database. All database queries A connection to CEO's backend database. All database queries
and updates are made via this class. and updates are made via this class.
Exceptions: (all methods) Exceptions: (all methods)
@ -84,7 +83,7 @@ class DBConnection(object):
def connected(self): def connected(self):
"""Determine whether the connection has been established.""" """Determine whether the connection has been established."""
return self.cnx != None return self.cnx is not None
def commit(self): def commit(self):
@ -130,8 +129,7 @@ class DBConnection(object):
# build a dictionary of dictionaries from the result (a list of lists) # build a dictionary of dictionaries from the result (a list of lists)
members_dict = {} members_dict = {}
for member in members_list: for member in members_list:
memberid, name, studentid, program, type, userid = member members_dict[member[0]] = {
members_dict[memberid] = {
'memberid': member[0], 'memberid': member[0],
'name': member[1], 'name': member[1],
'studentid': member[2], 'studentid': member[2],
@ -236,13 +234,13 @@ class DBConnection(object):
return self.select_single_member(sql, params) return self.select_single_member(sql, params)
def select_member_by_account(self, username): def select_member_by_userid(self, username):
""" """
Retrieves a single member by UNIX account username. Retrieves a single member by UNIX account username.
See: self.select_single_member() See: self.select_single_member()
Example: connection.select_member_by_account('ctdalek') -> Example: connection.select_member_by_userid('ctdalek') ->
{ 'memberid': 0, 'name': 'Calum T. Dalek' ...} { 'memberid': 0, 'name': 'Calum T. Dalek' ...}
""" """
sql = "SELECT memberid, name, studentid, program, type, userid FROM members WHERE userid=%s" sql = "SELECT memberid, name, studentid, program, type, userid FROM members WHERE userid=%s"
@ -266,7 +264,7 @@ class DBConnection(object):
return self.select_single_member(sql, params) return self.select_single_member(sql, params)
def insert_member(self, name, studentid=None, program=None): def insert_member(self, name, studentid=None, program=None, mtype='user', userid=None):
""" """
Creates a member with the specified attributes. Creates a member with the specified attributes.
@ -274,6 +272,8 @@ class DBConnection(object):
name - full name of member name - full name of member
studentid - student id number studentid - student id number
program - program of study program - program of study
mtype - member type
userid - account id
Example: connection.insert_member('Michael Spang', '99999999', 'Math/CS') -> 3349 Example: connection.insert_member('Michael Spang', '99999999', 'Math/CS') -> 3349
@ -287,8 +287,8 @@ class DBConnection(object):
memberid = result[0] memberid = result[0]
# insert the member # insert the member
sql = "INSERT INTO members (memberid, name, studentid, program, type) VALUES (%d, %s, %s, %s, %s)" sql = "INSERT INTO members (memberid, name, studentid, program, type, userid) VALUES (%d, %s, %s, %s, %s, %s)"
params = [ memberid, name, studentid, program, 'user' ] params = [ memberid, name, studentid, program, mtype, userid ]
self.cursor.execute(sql, params) self.cursor.execute(sql, params)
return memberid return memberid
@ -497,8 +497,8 @@ class DBConnection(object):
def trim_memberid_sequence(self): def trim_memberid_sequence(self):
""" """
Sets the value of the member id sequence to the id of the newest Sets the value of the member id sequence to the id of the newest
member. For use after extensive testing to prevent large member. For use after testing to prevent large intervals of unused
intervals of unused memberids. memberids from developing.
Note: this does nothing unless the most recently added member(s) have been deleted Note: this does nothing unless the most recently added member(s) have been deleted
""" """
@ -509,40 +509,163 @@ class DBConnection(object):
### Tests ### ### Tests ###
if __name__ == '__main__': if __name__ == '__main__':
HOST = "localhost"
DATABASE = "ceo"
from csc.common.test import *
conffile = "/etc/csc/pgsql.cf"
cfg = dict([map(str.strip, a.split("=", 1)) for a in map(str.strip, open(conffile).read().split("\n")) if "=" in a ])
hostnm = cfg['server'][1:-1]
dbase = cfg['database'][1:-1]
# t=test m=member s=student d=default e=expected u=updated
tmname = 'Test Member'
tmuname = 'Member Test'
tmsid = '00000004'
tmusid = '00000008'
tmprogram = 'Undecidable'
tmuprogram = 'Nondetermined'
tmtype = 'Untyped'
tmutype = 'Poly'
tmuserid = 'tmem'
tmuuserid = 'identifier'
tm2name = 'Test Member 2'
tm2sid = '00000005'
tm2program = 'Undeclared'
tm3name = 'T. M. 3'
dtype = 'user'
tmterm = 'w0000'
tm3term = 'f1112'
tm3term2 = 's1010'
emdict = { 'name': tmname, 'program': tmprogram, 'studentid': tmsid, 'type': tmtype, 'userid': tmuserid }
emudict = { 'name': tmuname, 'program': tmuprogram, 'studentid': tmusid, 'type': tmutype, 'userid': tmuuserid }
em2dict = { 'name': tm2name, 'program': tm2program, 'studentid': tm2sid, 'type': dtype, 'userid': None }
em3dict = { 'name': tm3name, 'program': None, 'studentid': None, 'type': dtype, 'userid': None }
test(DBConnection)
connection = DBConnection() connection = DBConnection()
success()
print "Running disconnect()" test(connection.connect)
connection.disconnect() connection.connect(hostnm, dbase)
success()
print "Running connect('%s', '%s')" % (HOST, DATABASE) test(connection.connected)
connection.connect(HOST, DATABASE) assert_equal(True, connection.connected())
success()
print "Running select_all_members()", "->", len(connection.select_all_members()), "members" test(connection.insert_member)
print "Running select_member_by_id(0)", "->", connection.select_member_by_id(0)['userid'] tmid = connection.insert_member(tmname, tmsid, tmprogram, tmtype, tmuserid)
print "Running select_members_by_name('Spang')", "->", connection.select_members_by_name('Spang').keys() tm2id = connection.insert_member(tm2name, tm2sid, tm2program)
print "Running select_members_by_term('f2006')", "->", "[" + ", ".join(map(str, connection.select_members_by_term('f2006').keys()[0:10])) + " ...]" tm3id = connection.insert_member(tm3name)
assert_equal(True, int(tmid) >= 0)
print "Running insert_member('test_member', '99999999', 'program')", assert_equal(True, int(tmid) >= 0)
memberid = connection.insert_member('test_member', '99999999', 'program') success()
print "->", memberid
print "Running select_member_by_id(%d)" % memberid, "->", connection.select_member_by_id(memberid) emdict['memberid'] = tmid
print "Running insert_term(%d, 'f2006')" % memberid emudict['memberid'] = tmid
connection.insert_term(memberid, 'f2006') em2dict['memberid'] = tm2id
em3dict['memberid'] = tm3id
print "Running select_terms(%d)" % memberid, "->", connection.select_terms(memberid) test(connection.select_member_by_id)
print "Running update_member({'memberid':%d,'name':'test_updated','studentid':-1})" % memberid m1 = connection.select_member_by_id(tmid)
connection.update_member({'memberid':memberid,'name':'test_updated','studentid':99999999}) m2 = connection.select_member_by_id(tm2id)
print "Running select_member_by_id(%d)" % memberid, "->", connection.select_member_by_id(memberid) m3 = connection.select_member_by_id(tm3id)
assert_equal(emdict, m1)
print "Running rollback()" assert_equal(em2dict, m2)
assert_equal(em3dict, m3)
success()
test(connection.select_all_members)
members = connection.select_all_members()
assert_equal(True, tmid in members)
assert_equal(True, tm2id in members)
assert_equal(True, tm3id in members)
assert_equal(emdict, members[tmid])
success()
test(connection.select_members_by_name)
members = connection.select_members_by_name(tmname)
assert_equal(True, tmid in members)
assert_equal(False, tm3id in members)
assert_equal(emdict, members[tmid])
success()
test(connection.select_member_by_userid)
assert_equal(emdict, connection.select_member_by_userid(tmuserid))
success()
test(connection.insert_term)
connection.insert_term(tmid, tmterm)
connection.insert_term(tm3id, tm3term)
connection.insert_term(tm3id, tm3term2)
success()
test(connection.select_members_by_term)
members = connection.select_members_by_term(tmterm)
assert_equal(True, tmid in members)
assert_equal(False, tm2id in members)
assert_equal(False, tm3id in members)
success()
test(connection.select_term)
assert_equal(tmterm, connection.select_term(tmid, tmterm))
assert_equal(None, connection.select_term(tm2id, tmterm))
assert_equal(tm3term, connection.select_term(tm3id, tm3term))
assert_equal(tm3term2, connection.select_term(tm3id, tm3term2))
success()
test(connection.select_terms)
trms = connection.select_terms(tmid)
trms2 = connection.select_terms(tm2id)
assert_equal([tmterm], trms)
assert_equal([], trms2)
success()
test(connection.delete_term)
assert_equal(tm3term, connection.select_term(tm3id, tm3term))
connection.delete_term(tm3id, tm3term)
assert_equal(None, connection.select_term(tm3id, tm3term))
success()
test(connection.update_member)
connection.update_member({'memberid': tmid, 'name': tmuname})
connection.update_member({'memberid': tmid, 'program': tmuprogram, 'studentid': tmusid })
connection.update_member({'memberid': tmid, 'userid': tmuuserid, 'type': tmutype })
assert_equal(emudict, connection.select_member_by_id(tmid))
connection.update_member(emdict)
assert_equal(emdict, connection.select_member_by_id(tmid))
success()
test(connection.delete_term_all)
connection.delete_term_all(tm2id)
connection.delete_term_all(tm3id)
assert_equal([], connection.select_terms(tm2id))
assert_equal([], connection.select_terms(tm3id))
success()
test(connection.delete_member)
connection.delete_member(tm3id)
assert_equal(None, connection.select_member_by_id(tm3id))
negative(connection.delete_member, (tmid,), DBException, "delete of term-registered member")
success()
test(connection.rollback)
connection.rollback() connection.rollback()
assert_equal(None, connection.select_member_by_id(tm2id))
success()
print "Resetting memberid sequence" test(connection.commit)
connection.commit()
success()
test(connection.trim_memberid_sequence)
connection.trim_memberid_sequence() connection.trim_memberid_sequence()
success()
print "Running disconnect()"
connection.disconnect() test(connection.disconnect)
connection.disconnect()
assert_equal(False, connection.connected())
connection.disconnect()
success()

View File

@ -1,4 +1,3 @@
# $Id: ipc.py 26 2006-12-20 21:25:08Z mspang $
""" """
IPC Library Functions IPC Library Functions
@ -14,22 +13,21 @@ class _pty_file(object):
""" """
A 'file'-like wrapper class for pseudoterminal file descriptors. A 'file'-like wrapper class for pseudoterminal file descriptors.
This wrapper is necessary because Python has a nasty This wrapper is necessary because Python has a nasty habit of throwing
habit of throwing OSError at pty EOF. OSError at pty EOF.
This class also implements timeouts for read operations This class also implements timeouts for read operations which are handy
which are handy for avoiding deadlock when both for avoiding deadlock when both processes are blocked in a read().
processes are blocked in a read().
See the Python documentation of the file class See the Python documentation of the file class for explanation
for explanation of the methods. of the methods.
""" """
def __init__(self, fd): def __init__(self, fd):
self.fd = fd self.fd = fd
self.buffer = '' self.buffer = ''
self.closed = False self.closed = False
def __repr__(self): def __repr__(self):
status='open' status = 'open'
if self.closed: if self.closed:
status = 'closed' status = 'closed'
return "<" + status + " pty '" + os.ttyname(self.fd) + "'>" return "<" + status + " pty '" + os.ttyname(self.fd) + "'>"
@ -43,8 +41,8 @@ class _pty_file(object):
while data != '': while data != '':
# wait timeout for the pty to become ready, otherwise stop reading # wait timeout for the pty to become ready, otherwise stop reading
if not block and len(select.select([self.fd],[],[], timeout)[0]) == 0: if not block and len(select.select([self.fd], [], [], timeout)[0]) == 0:
break break
data = os.read(self.fd, 65536) data = os.read(self.fd, 65536)
self.buffer += data self.buffer += data
@ -61,7 +59,7 @@ class _pty_file(object):
try: try:
# wait timeout for the pty to become ready, then read # wait timeout for the pty to become ready, then read
if block or len(select.select([self.fd],[],[], timeout)[0]) != 0: if block or len(select.select([self.fd], [], [], timeout)[0]) != 0:
self.buffer += os.read(self.fd, size - len(self.buffer) ) self.buffer += os.read(self.fd, size - len(self.buffer) )
except OSError: except OSError:
@ -78,8 +76,8 @@ class _pty_file(object):
while data != '' and self.buffer.find("\n") == -1 and (size < 0 or len(self.buffer) < size): while data != '' and self.buffer.find("\n") == -1 and (size < 0 or len(self.buffer) < size):
# wait timeout for the pty to become ready, otherwise stop reading # wait timeout for the pty to become ready, otherwise stop reading
if not block and len(select.select([self.fd],[],[], timeout)[0]) == 0: if not block and len(select.select([self.fd], [], [], timeout)[0]) == 0:
break break
data = os.read(self.fd, 128) data = os.read(self.fd, 128)
self.buffer += data self.buffer += data
@ -94,7 +92,7 @@ class _pty_file(object):
line = self.buffer[:split_index] line = self.buffer[:split_index]
self.buffer = self.buffer[split_index:] self.buffer = self.buffer[split_index:]
return line return line
def readlines(self, sizehint=None, block=True, timeout=0.1): def readlines(self, sizehint=None, timeout=0.1):
lines = [] lines = []
line = None line = None
while True: while True:
@ -138,7 +136,7 @@ def popeni(command, args, env=None):
args - a list of arguments to pass to command args - a list of arguments to pass to command
env - optional environment for command env - optional environment for command
Returns: (pid, stdout, stdIn) Returns: (pid, stdout, stdin)
""" """
# use a pipe to send data to the child # use a pipe to send data to the child
@ -181,7 +179,7 @@ def popeni(command, args, env=None):
# set the controlling terminal to the pty # set the controlling terminal to the pty
# by opening it (and closing it again since # by opening it (and closing it again since
# it's already open as child_stdout) # it's already open as child_stdout)
fd = os.open(tty, os.O_RDWR); fd = os.open(tty, os.O_RDWR)
os.close(fd) os.close(fd)
# init stdin/out/err # init stdin/out/err
@ -209,14 +207,21 @@ def popeni(command, args, env=None):
return pid, _pty_file(parent_stdout), os.fdopen(parent_stdin, 'w') return pid, _pty_file(parent_stdout), os.fdopen(parent_stdin, 'w')
### Tests ### ### Tests ###
if __name__ == '__main__': if __name__ == '__main__':
import sys from csc.common.test import *
pid, recv, send = popeni('/usr/sbin/kadmin.local', ['kadmin'])
send.write("listprincs\n") prog = '/bin/cat'
argv = [ prog ]
message = "test\n"
test(popeni)
proc, recv, send = popeni(prog, argv)
send.write(message)
send.flush() send.flush()
line = recv.readline()
print recv.readlines() assert_equal(message.strip(), line.strip())
success()

View File

@ -1,4 +1,3 @@
# $Id: krb.py 40 2006-12-29 00:40:31Z mspang $
""" """
Kerberos Backend Interface Kerberos Backend Interface
@ -12,8 +11,8 @@ systems. Accounts that do not authenticate (e.g. club accounts) do not need
a Kerberos principal. a Kerberos principal.
Unfortunately, there are no Python bindings to libkadm at this time. As a Unfortunately, there are no Python bindings to libkadm at this time. As a
temporary workaround, This module communicates with the kadmin CLI interface temporary workaround, this module communicates with the kadmin CLI interface
via a pseudoterminal and pipe. via a pseudo-terminal and a pipe.
""" """
import os import os
import ipc import ipc
@ -109,7 +108,7 @@ class KrbConnection(object):
def connected(self): def connected(self):
"""Determine whether the connection has been established.""" """Determine whether the connection has been established."""
return self.pid != None return self.pid is not None
@ -125,6 +124,7 @@ class KrbConnection(object):
# list of lines output by kadmin # list of lines output by kadmin
result = [] result = []
lines = []
# the kadmin prompt that signals the end output # the kadmin prompt that signals the end output
# note: KADMIN_ARGS[0] must be "kadmin" or the actual prompt will differ # note: KADMIN_ARGS[0] must be "kadmin" or the actual prompt will differ
@ -137,12 +137,12 @@ class KrbConnection(object):
timeout_maximum = 1.00 timeout_maximum = 1.00
# input loop: read from kadmin until the kadmin prompt # input loop: read from kadmin until the kadmin prompt
buffer = '' buf = ''
while True: while True:
# attempt to read any available data # attempt to read any available data
data = self.kadm_out.read(block=False, timeout=timeout) data = self.kadm_out.read(block=False, timeout=timeout)
buffer += data buf += data
# nothing was read # nothing was read
if data == '': if data == '':
@ -165,20 +165,20 @@ class KrbConnection(object):
else: else:
# kadmin died! # kadmin died!
raise KrbException("kadmin died while reading response") raise KrbException("kadmin died while reading response:\n%s\n%s" % ("\n".join(lines), buf))
# break into lines and save all but the final # break into lines and save all but the final
# line (which is incomplete) into result # line (which is incomplete) into result
lines = buffer.split("\n") lines = buf.split("\n")
buffer = lines[-1] buf = lines[-1]
lines = lines[:-1] lines = lines[:-1]
for line in lines: for line in lines:
line = line.strip() line = line.strip()
result.append(line) result.append(line)
# if the incomplete lines in the buffer is the kadmin prompt, # if the incomplete line in the buffer is the kadmin prompt,
# then the result is complete and may be returned # then the result is complete and may be returned
if buffer.strip() == prompt: if buf.strip() == prompt:
break break
return result return result
@ -189,7 +189,7 @@ class KrbConnection(object):
Helper function to execute a kadmin command. Helper function to execute a kadmin command.
Parameters: Parameters:
command - the command to execute command - command string to pass on to kadmin
Returns: a list of lines output by the command Returns: a list of lines output by the command
""" """
@ -222,8 +222,8 @@ class KrbConnection(object):
"ceo/admin@CSCLUB.UWATERLOO.CA", "ceo/admin@CSCLUB.UWATERLOO.CA",
"sysadmin/admin@CSCLUB.UWATERLOO.CA", "sysadmin/admin@CSCLUB.UWATERLOO.CA",
"mspang@CSCLUB.UWATERLOO.CA", "mspang@CSCLUB.UWATERLOO.CA",
...
] ]
""" """
principals = self.execute("list_principals") principals = self.execute("list_principals")
@ -374,16 +374,13 @@ class KrbConnection(object):
# ensure success message was received # ensure success message was received
if not created: if not created:
raise KrbException("did not receive principal created in response") raise KrbException("kadmin did not acknowledge principal creation")
def delete_principal(self, principal): def delete_principal(self, principal):
""" """
Delete a principal. Delete a principal.
Parameters:
principal - the principal name
Example: connection.delete_principal("mspang@CSCLUB.UWATERLOO.CA") Example: connection.delete_principal("mspang@CSCLUB.UWATERLOO.CA")
""" """
@ -424,25 +421,116 @@ class KrbConnection(object):
raise KrbException("did not receive principal deleted") raise KrbException("did not receive principal deleted")
def change_password(self, principal, password):
"""
Changes a principal's password.
Example: connection.change_password("mspang@CSCLUB.UWATERLOO.CA", "opensesame")
"""
# exec the add_principal command
if password.find('"') == -1:
self.kadm_in.write('change_password -pw "' + password + '" "' + principal + '"\n')
else:
self.kadm_in.write('change_password "' + principal + '"\n')
self.kadm_in.write(password + "\n" + password + "\n")
# send request and read response
self.kadm_in.flush()
output = self.read_result()
# verify output
changed = False
for line in output:
# ignore NOTICE lines
if line.find("NOTICE:") == 0:
continue
# ignore prompts
elif line.find("Enter password") == 0 or line.find("Re-enter password") == 0:
continue
# record whether success message was encountered
elif line.find("Password") == 0 and line.find("changed.") != 0:
changed = True
# error messages
elif line.find("change_password:") == 0 or line.find("kadmin:") == 0:
raise KrbException(line)
# unknown output
else:
raise KrbException("unexpected change_password output: " + line)
# ensure success message was received
if not changed:
raise KrbException("kadmin did not acknowledge password change")
### Tests ### ### Tests ###
if __name__ == '__main__': if __name__ == '__main__':
PRINCIPAL = 'ceo/admin@CSCLUB.UWATERLOO.CA'
KEYTAB = 'ceo.keytab' from csc.common.test import *
import random
connection = KrbConnection()
print "running disconnect()" conffile = '/etc/csc/kerberos.cf'
connection.disconnect()
print "running connect('%s', '%s')" % (PRINCIPAL, KEYTAB) cfg = dict([map(str.strip, a.split("=", 1)) for a in map(str.strip, open(conffile).read().split("\n")) if "=" in a ])
connection.connect(PRINCIPAL, KEYTAB) principal = cfg['admin_principal'][1:-1]
print "running list_principals()", "->", "[" + ", ".join(map(repr,connection.list_principals()[0:3])) + " ...]" keytab = cfg['admin_keytab'][1:-1]
print "running get_privs()", "->", str(connection.get_privs()) realm = cfg['realm'][1:-1]
print "running add_principal('testtest', 'BLAH')"
connection.add_principal("testtest", "FJDSLDLFKJSF") # t=test p=principal e=expected
print "running get_principal('testtest')", "->", '(' + connection.get_principal("testtest")['Principal'] + ')' tpname = 'testpirate' + '@' + realm
print "running delete_principal('testtest')" tpw = str(random.randint(10**30, 10**31-1)) + 'YAR!'
connection.delete_principal("testtest") eprivs = ['GET', 'ADD', 'MODIFY', 'DELETE']
print "running disconnect()"
connection.disconnect() test(KrbConnection)
connection = KrbConnection()
success()
test(connection.connect)
connection.connect(principal, keytab)
success()
try:
connection.delete_principal(tpname)
except KrbException:
pass
test(connection.connected)
assert_equal(True, connection.connected())
success()
test(connection.add_principal)
connection.add_principal(tpname, tpw)
success()
test(connection.list_principals)
pals = connection.list_principals()
assert_equal(True, tpname in pals)
success()
test(connection.get_privs)
privs = connection.get_privs()
assert_equal(eprivs, privs)
success()
test(connection.get_principal)
princ = connection.get_principal(tpname)
assert_equal(tpname, princ['Principal'])
success()
test(connection.delete_principal)
connection.delete_principal(tpname)
assert_equal(None, connection.get_principal(tpname))
success()
test(connection.disconnect)
connection.disconnect()
assert_equal(False, connection.connected())
success()

View File

@ -1,4 +1,3 @@
# $Id: ldapi.py 41 2006-12-29 04:22:31Z mspang $
""" """
LDAP Backend Interface LDAP Backend Interface
@ -60,7 +59,7 @@ class LDAPConnection(object):
""" """
if bind_pw == None: bind_pw = '' if bind_pw is None: bind_pw = ''
try: try:
@ -93,7 +92,7 @@ class LDAPConnection(object):
def connected(self): def connected(self):
"""Determine whether the connection has been established.""" """Determine whether the connection has been established."""
return self.ldap != None return self.ldap is not None
@ -137,31 +136,33 @@ class LDAPConnection(object):
Retrieve the attributes of a user. Retrieve the attributes of a user.
Parameters: Parameters:
uid - the UNIX user accound name of the user uid - the UNIX username to look up
Returns: attributes of user with uid Returns: attributes of user with uid
Example: connection.user_lookup('mspang') -> Example: connection.user_lookup('mspang') ->
{ 'uid': 'mspang', 'uidNumber': 21292 ...} { 'uid': 'mspang', 'uidNumber': 21292 ...}
""" """
if not self.connected(): raise LDAPException("Not connected!")
dn = 'uid=' + uid + ',' + self.user_base dn = 'uid=' + uid + ',' + self.user_base
return self.lookup(dn) return self.lookup(dn)
def user_search(self, filter): def user_search(self, search_filter):
""" """
Helper for user searches. Helper for user searches.
Parameters: Parameters:
filter - LDAP filter string to match users against search_filter - LDAP filter string to match users against
Returns: the list of uids matched Returns: the list of uids matched (usernames)
""" """
# search for entries that match the filter # search for entries that match the filter
try: try:
matches = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, filter) matches = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, search_filter)
except ldap.LDAPError, e: except ldap.LDAPError, e:
raise LDAPException("user search failed: %s" % e) raise LDAPException("user search failed: %s" % e)
@ -196,47 +197,45 @@ class LDAPConnection(object):
Parameters: Parameters:
uidNumber - the user id of the accounts desired uidNumber - the user id of the accounts desired
Returns: the list of uids matched Returns: the list of uids matched (usernames)
Example: connection.user_search_id(21292) -> ['mspang'] Example: connection.user_search_id(21292) -> ['mspang']
""" """
# search for posixAccount entries with the specified uidNumber # search for posixAccount entries with the specified uidNumber
filter = '(&(objectClass=posixAccount)(uidNumber=%d))' % uidNumber search_filter = '(&(objectClass=posixAccount)(uidNumber=%d))' % uidNumber
return self.user_search(filter) return self.user_search(search_filter)
def user_search_gid(self, gidNumber): def user_search_gid(self, gidNumber):
""" """
Retrieves a list of users with a certain UNIX gid number. Retrieves a list of users with a certain UNIX gid
number (search by default group).
Parameters: Returns: the list of uids matched (usernames)
gidNumber - the group id of the accounts desired
Returns: the list of uids matched
""" """
# search for posixAccount entries with the specified gidNumber # search for posixAccount entries with the specified gidNumber
filter = '(&(objectClass=posixAccount)(gidNumber=%d))' % gidNumber search_filter = '(&(objectClass=posixAccount)(gidNumber=%d))' % gidNumber
return self.user_search(filter) return self.user_search(search_filter)
def user_add(self, uid, cn, loginShell, uidNumber, gidNumber, homeDirectory, gecos): def user_add(self, uid, cn, uidNumber, gidNumber, homeDirectory, loginShell=None, gecos=None, description=None):
""" """
Adds a user to the directory. Adds a user to the directory.
Parameters: Parameters:
uid - the UNIX username for the account uid - the UNIX username for the account
cn - the full name of the member cn - the real name of the member
userPassword - password of the account (our setup does not use this)
loginShell - login shell for the user
uidNumber - the UNIX user id number uidNumber - the UNIX user id number
gidNumber - the UNIX group id number gidNumber - the UNIX group id number (default group)
homeDirectory - home directory for the user homeDirectory - home directory for the user
gecos - comment field (usually stores miscellania) loginShell - login shell for the user
gecos - comment field (usually stores name etc)
description - description field (optional and unimportant)
Example: connection.user_add('mspang', 'Michael Spang', Example: connection.user_add('mspang', 'Michael Spang',
'/bin/bash', 21292, 100, '/users/mspang', 21292, 100, '/users/mspang', '/bin/bash',
'Michael Spang,,,') 'Michael Spang,,,')
""" """
@ -251,6 +250,11 @@ class LDAPConnection(object):
'homeDirectory': [ homeDirectory ], 'homeDirectory': [ homeDirectory ],
'gecos': [ gecos ], 'gecos': [ gecos ],
} }
if loginShell:
attrs['loginShell'] = loginShell
if description:
attrs['description'] = [ description ]
try: try:
modlist = ldap.modlist.addModlist(attrs) modlist = ldap.modlist.addModlist(attrs)
@ -265,7 +269,7 @@ class LDAPConnection(object):
Parameters: Parameters:
uid - username of the user to modify uid - username of the user to modify
entry - dictionary as returned by user_lookup() with changes to make. attrs - dictionary as returned by user_lookup() with changes to make.
omitted attributes are DELETED. omitted attributes are DELETED.
Example: user = user_lookup('mspang') Example: user = user_lookup('mspang')
@ -295,9 +299,6 @@ class LDAPConnection(object):
""" """
Removes a user from the directory. Removes a user from the directory.
Parameters:
uid - the UNIX username of the account
Example: connection.user_delete('mspang') Example: connection.user_delete('mspang')
""" """
@ -318,7 +319,7 @@ class LDAPConnection(object):
Parameters: Parameters:
cn - the UNIX group name to lookup cn - the UNIX group name to lookup
Returns: attributes of group with cn Returns: attributes of the group's LDAP entry
Example: connection.group_lookup('office') -> { Example: connection.group_lookup('office') -> {
'cn': 'office', 'cn': 'office',
@ -335,9 +336,6 @@ class LDAPConnection(object):
""" """
Retrieves a list of groups with the specified UNIX group number. Retrieves a list of groups with the specified UNIX group number.
Parameters:
gidNumber - the group id of the groups desired
Returns: a list of groups with gid gidNumber Returns: a list of groups with gid gidNumber
Example: connection.group_search_id(1001) -> ['office'] Example: connection.group_search_id(1001) -> ['office']
@ -345,8 +343,8 @@ class LDAPConnection(object):
# search for posixAccount entries with the specified uidNumber # search for posixAccount entries with the specified uidNumber
try: try:
filter = '(&(objectClass=posixGroup)(gidNumber=%d))' % gidNumber search_filter = '(&(objectClass=posixGroup)(gidNumber=%d))' % gidNumber
matches = self.ldap.search_s(self.group_base, ldap.SCOPE_SUBTREE, filter) matches = self.ldap.search_s(self.group_base, ldap.SCOPE_SUBTREE, search_filter)
except ldap.LDAPError,e : except ldap.LDAPError,e :
raise LDAPException("group search failed: %s" % e) raise LDAPException("group search failed: %s" % e)
@ -370,15 +368,11 @@ class LDAPConnection(object):
return group_cns return group_cns
def group_add(self, cn, gidNumber): def group_add(self, cn, gidNumber, description=None):
""" """
Adds a group to the directory. Adds a group to the directory.
Parameters: Example: connection.group_add('office', 1001, 'Office Staff')
cn - the name of the group
gidNumber - the number of the group
Example: connection.group_add('office', 1001)
""" """
dn = 'cn=' + cn + ',' + self.group_base dn = 'cn=' + cn + ',' + self.group_base
@ -387,6 +381,8 @@ class LDAPConnection(object):
'cn': [ cn ], 'cn': [ cn ],
'gidNumber': [ str(gidNumber) ], 'gidNumber': [ str(gidNumber) ],
} }
if description:
attrs['description'] = description
try: try:
modlist = ldap.modlist.addModlist(attrs) modlist = ldap.modlist.addModlist(attrs)
@ -399,9 +395,8 @@ class LDAPConnection(object):
""" """
Update group attributes in the directory. Update group attributes in the directory.
The only available updates are fairly destructive The only available updates are fairly destructive (rename or renumber)
(rename or renumber) but this method is provided but this method is provided for completeness.
for completeness.
Parameters: Parameters:
cn - name of the group to modify cn - name of the group to modify
@ -436,9 +431,6 @@ class LDAPConnection(object):
""" """
Removes a group from the directory." Removes a group from the directory."
Parameters:
cn - the name of the group
Example: connection.group_delete('office') Example: connection.group_delete('office')
""" """
@ -449,129 +441,203 @@ class LDAPConnection(object):
raise LDAPException("unable to delete group: %s" % e) raise LDAPException("unable to delete group: %s" % e)
def group_members(self, cn):
"""
Retrieves a group's members.
Parameters:
cn - the name of the group
Example: connection.group_members('office') ->
['sfflaw', 'jeperry', 'cschopf' ...]
"""
group = self.group_lookup(cn)
return group.get('memberUid', None)
### Miscellaneous Methods ### ### Miscellaneous Methods ###
def first_id(self, minimum, maximum):
"""
Determines the first available id within a range.
To be "available", there must be neither a user def used_uids(self, minimum=None, maximum=None):
with the id nor a group with the id. """
Compiles a list of used UIDs in a range.
Parameters: Parameters:
minimum - smallest uid that may be returned minimum - smallest uid to return in the list
maximum - largest uid that may be returned maximum - largest uid to return in the list
Returns: the id, or None if there are none available Returns: list of integer uids
Example: connection.first_id(20000, 40000) -> 20018 Example: connection.used_uids(20000, 40000) -> [20000, 20001, ...]
""" """
# compile a list of used uids
try: try:
users = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['uidNumber']) users = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['uidNumber'])
except ldap.LDAPError, e: except ldap.LDAPError, e:
raise LDAPException("search for uids failed: %s" % e) raise LDAPException("search for uids failed: %s" % e)
uids = [] uids = []
for user in users: for user in users:
dn, attrs = user dn, attrs = user
uid = int(attrs['uidNumber'][0]) uid = int(attrs['uidNumber'][0])
if minimum <= uid <= maximum: if (not minimum or uid >= minimum) and (not maximum or uid <= maximum):
uids.append(uid) uids.append(uid)
# compile a list of used gids return uids
def used_gids(self, minimum=None, maximum=None):
"""
Compiles a list of used GIDs in a range.
Parameters:
minimum - smallest gid to return in the list
maximum - largest gid to return in the list
Returns: list of integer gids
Example: connection.used_gids(20000, 40000) -> [20000, 20001, ...]
"""
try: try:
groups = self.ldap.search_s(self.group_base, ldap.SCOPE_SUBTREE, '(objectClass=posixGroup)', ['gidNumber']) users = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['gidNumber'])
except ldap.LDAPError, e: except ldap.LDAPError, e:
raise LDAPException("search for gids failed: %s" % e) raise LDAPException("search for gids failed: %s" % e)
gids = [] gids = []
for group in groups: for user in users:
dn, attrs = group dn, attrs = user
gid = int(attrs['gidNumber'][0]) gid = int(attrs['gidNumber'][0])
if minimum <= gid <= maximum: if (not minimum or gid >= minimum) and (not maximum or gid <= maximum):
gids.append(gid) gids.append(gid)
# iterate through ids and return the first available return gids
for id in xrange(minimum, maximum+1):
if not id in uids and not id in gids:
return id
# no suitable id was found
return None
### Tests ### ### Tests ###
if __name__ == '__main__': if __name__ == '__main__':
password_file = 'ldap.ceo' from csc.common.test import *
server = 'ldaps:///'
base_dn = 'dc=csclub,dc=uwaterloo,dc=ca'
bind_dn = 'cn=ceo,' + base_dn
user_dn = 'ou=People,' + base_dn
group_dn = 'ou=Group,' + base_dn
bind_pw = open(password_file).readline().strip()
conffile = '/etc/csc/ldap.cf'
cfg = dict([map(str.strip, a.split("=", 1)) for a in map(str.strip, open(conffile).read().split("\n")) if "=" in a ])
srvurl = cfg['server_url'][1:-1]
binddn = cfg['admin_bind_dn'][1:-1]
bindpw = cfg['admin_bind_pw'][1:-1]
ubase = cfg['users_base'][1:-1]
gbase = cfg['groups_base'][1:-1]
minid = 99999000
maxid = 100000000
# t=test u=user g=group c=changed r=real e=expected
tuname = 'testuser'
turname = 'Test User'
tuhome = '/home/testuser'
tushell = '/bin/false'
tugecos = 'Test User,,,'
tgname = 'testgroup'
cushell = '/bin/true'
cuhome = '/home/changed'
curname = 'Test Modified User'
test("LDAPConnection()")
connection = LDAPConnection() connection = LDAPConnection()
print "running disconnect()" success()
test("disconnect()")
connection.disconnect() connection.disconnect()
print "running connect('%s', '%s', '%s', '%s', '%s')" % (server, bind_dn, '***', user_dn, group_dn) success()
connection.connect(server, bind_dn, bind_pw, user_dn, group_dn)
print "running user_lookup('mspang')", "->", "(%s)" % connection.user_lookup('mspang')['uidNumber'][0] test("connect()")
print "running user_search_id(21292)", "->", connection.user_search_id(21292) connection.connect(srvurl, binddn, bindpw, ubase, gbase)
print "running first_id(20000, 40000)", "->", if not connection.connected():
first_id = connection.first_id(20000, 40000) fail("not connected")
print first_id success()
print "running group_add('testgroup', %d)" % first_id
try: try:
connection.group_add('testgroup', first_id) connection.user_delete(tuname)
except Exception, e: connection.group_delete(tgname)
print "FAILED: %s (continuing)" % e except LDAPException:
print "running user_add('testuser', 'Test User', '/bin/false', %d, %d, '/home/null', 'Test User,,,')" % (first_id, first_id) pass
try:
connection.user_add('testuser', 'Test User', '/bin/false', first_id, first_id, '/home/null', 'Test User,,,') test("used_uids()")
except Exception, e: uids = connection.used_uids(minid, maxid)
print "FAILED: %s (continuing)" % e if type(uids) is not list:
print "running user_lookup('testuser')", "->", fail("list not returned")
user = connection.user_lookup('testuser') success()
print repr(connection.user_lookup('testuser')['cn'][0])
user['homeDirectory'] = ['/home/changed'] test("used_gids()")
user['loginShell'] = ['/bin/true'] gids = connection.used_gids(minid, maxid)
print "running user_modify(...)" if type(gids) is not list:
connection.user_modify('testuser', user) fail("list not returned")
print "running user_lookup('testuser')", "->", success()
user = connection.user_lookup('testuser')
print '(%s, %s)' % (user['homeDirectory'], user['loginShell']) unusedids = []
print "running group_lookup('testgroup')", "->", for idnum in xrange(minid, maxid):
group = connection.group_lookup('testgroup') if not idnum in uids and not idnum in gids:
print group unusedids.append(idnum)
print "running group_modify(...)"
group['gidNumber'] = [str(connection.first_id(20000, 40000))] tuuid = unusedids.pop()
group['memberUid'] = [ str(first_id) ] tugid = unusedids.pop()
connection.group_modify('testgroup', group) eudata = {
print "running group_lookup('testgroup')", "->", 'uid': [ tuname ],
group = connection.group_lookup('testgroup') 'loginShell': [ tushell ],
print group 'uidNumber': [ str(tuuid) ],
print "running user_delete('testuser')" 'gidNumber': [ str(tugid) ],
connection.user_delete('testuser') 'gecos': [ tugecos ],
print "running group_delete('testgroup')" 'homeDirectory': [ tuhome ],
connection.group_delete('testgroup') 'cn': [ turname ]
print "running user_search_gid(100)", "->", "[" + ", ".join(map(repr,connection.user_search_gid(100)[:10])) + " ...]" }
print "running group_members('office')", "->", "[" + ", ".join(map(repr,connection.group_members('office')[:10])) + " ...]"
print "running disconnect()" test("user_add()")
connection.user_add(tuname, turname, tuuid, tugid, tuhome, tushell, tugecos)
success()
tggid = unusedids.pop()
egdata = {
'cn': [ tgname ],
'gidNumber': [ str(tggid) ]
}
test("group_add()")
connection.group_add(tgname, tggid)
success()
test("user_lookup()")
udata = connection.user_lookup(tuname)
del udata['objectClass']
assert_equal(eudata, udata)
success()
test("group_lookup()")
gdata = connection.group_lookup(tgname)
del gdata['objectClass']
assert_equal(egdata, gdata)
success()
test("user_search_id()")
eulist = [ tuname ]
ulist = connection.user_search_id(tuuid)
assert_equal(eulist, ulist)
success()
test("user_search_gid()")
ulist = connection.user_search_gid(tugid)
if tuname not in ulist:
fail("(%s) not in (%s)" % (tuname, ulist))
success()
ecudata = connection.user_lookup(tuname)
ecudata['loginShell'] = [ cushell ]
ecudata['homeDirectory'] = [ cuhome ]
ecudata['cn'] = [ curname ]
test("user_modify")
connection.user_modify(tuname, ecudata)
cudata = connection.user_lookup(tuname)
assert_equal(ecudata, cudata)
success()
ecgdata = connection.group_lookup(tgname)
ecgdata['memberUid'] = [ tuname ]
test("group_modify()")
connection.group_modify(tgname, ecgdata)
cgdata = connection.group_lookup(tgname)
assert_equal(ecgdata, cgdata)
success()
test("user_delete()")
connection.group_delete(tgname)
success()
test("disconnect()")
connection.disconnect() connection.disconnect()
success()

View File

@ -1,3 +1,7 @@
""" """
Generally Useful Common Modules Generally Useful Common Modules
conf - simple configuration file reader
excep - generally useful exceptions
test - test suite utility routines
""" """

View File

@ -1,11 +1,75 @@
"""Library Routines""" """
Configuration Utility Module
def read_config(config_file): This module contains functions to load and verify very simple configuration
files. Python supports ".ini" files, which suck, so this module is used
instead.
Example Configuration File:
include /path/to/other.cf
# these values are the same:
name_protected = "Michael Spang"
name_unprotected = Michael Spang
# these values are not the same:
yes_no = " yes"
no_yes = yes
# this value is an integer
arbitrary_number=2
# this value is not an integer
arbitrary_string="2"
# this is a key with no value
csclub
# this key contains whitespace
white space = sure, why not
# these two lines are treated as one
long line = first line \
second line
Resultant Dictionary:
{
'name_protected': 'Michael Spang',
'name_unprotected:' 'Michael Spang',
'yes_no': ' yes',
'no_yes': 'yes',
'arbirary_number': 2,
'arbitrary_string': '2',
'csclub': None,
'white space': 'sure, why not'
'long line': 'first line \n second line'
... (data from other.cf) ...
}
"""
from curses.ascii import isspace
class ConfigurationException(Exception):
"""Exception class for incomplete and incorrect configurations."""
def read(filename, included=None):
"""Function to read a configuration file into a dictionary."""
if not included:
included = []
if filename in included:
return {}
included.append(filename)
try: try:
conffile = open(config_file) conffile = open(filename)
except IOError: except IOError:
return None raise ConfigurationException('unable to read configuration file: "%s"' % filename)
options = {} options = {}
@ -15,9 +79,11 @@ def read_config(config_file):
if line == '': if line == '':
break break
# remove comments
if '#' in line: if '#' in line:
line = line[:line.find('#')] line = line[:line.find('#')]
# combine lines when the newline is escaped with \
while len(line) > 1 and line[-2] == '\\': while len(line) > 1 and line[-2] == '\\':
line = line[:-2] + line[-1] line = line[:-2] + line[-1]
next = conffile.readline() next = conffile.readline()
@ -25,22 +91,64 @@ def read_config(config_file):
if next == '': if next == '':
break break
line = line.strip()
# process include statements
if line.find("include") == 0 and isspace(line[7]):
filename = line[8:].strip()
options.update(read(filename, included))
continue
# split 'key = value' into key and value and strip results
pair = map(str.strip, line.split('=', 1)) pair = map(str.strip, line.split('=', 1))
# found key and value
if len(pair) == 2: if len(pair) == 2:
key, val = pair key, val = pair
# found quoted string?
if val[0] == val[-1] == '"': if val[0] == val[-1] == '"':
val = val[1:-1] val = val[1:-1]
# unquoted, found float?
else: else:
try: try:
val = int(val) if "." in val:
except: val = float(val)
else:
val = int(val)
except ValueError:
pass pass
# save key and value
options[key] = val options[key] = val
# found only key, value = None
elif len(pair[0]) > 1: elif len(pair[0]) > 1:
key, = pair key = pair[0]
options[key] = None options[key] = None
return options return options
def check_string_fields(filename, field_list, cfg):
"""Function to verify thatfields are strings."""
for field in field_list:
if field not in cfg or type(cfg[field]) is not str:
raise ConfigurationException('expected string value for option "%s" in "%s"' % (field, filename))
def check_integer_fields(filename, field_list, cfg):
"""Function to verify that fields are integers."""
for field in field_list:
if field not in cfg or type(cfg[field]) not in (int, long):
raise ConfigurationException('expected numeric value for option "%s" in "%s"' % (field, filename))
def check_float_fields(filename, field_list, cfg):
"""Function to verify that fields are integers or floats."""
for field in field_list:
if field not in cfg or type(cfg[field]) not in (float, long, int):
raise ConfigurationException('expected float value for option "%s" in "%s"' % (field, filename))

12
pylib/csc/common/excep.py Normal file
View File

@ -0,0 +1,12 @@
"""
Exceptions Module
This module provides some simple but generally useful exception classes.
"""
class InvalidArgument(Exception):
"""Exception class for bad argument values."""
def __init__(self, argname, argval, explanation):
self.argname, self.argval, self.explanation = argname, argval, explanation
def __str__(self):
return 'Bad argument value "%s" for %s: %s' % (self.argval, self.argname, self.explanation)

42
pylib/csc/common/test.py Normal file
View File

@ -0,0 +1,42 @@
"""
Common Test Routines
This module contains helpful functions called by each module's test suite.
"""
from types import FunctionType, MethodType, ClassType, TypeType
class TestException(Exception):
"""Exception class for test failures."""
def test(subject):
"""Print a test message."""
if type(subject) in (MethodType, FunctionType, ClassType, TypeType):
print "testing %s()..." % subject.__name__,
else:
print "testing %s..." % subject,
def success():
"""Print a success message."""
print "pass."
def assert_equal(expected, actual):
if expected != actual:
message = "Expected (%s)\nWas (%s)" % (repr(expected), repr(actual))
fail(message)
def fail(message):
print "failed!"
raise TestException("Test failed:\n%s" % message)
def negative(call, args, excep, message):
try:
call(*args)
fail(message)
except excep:
pass

View File

@ -1,5 +1,4 @@
#!/bin/sh #!/bin/sh
# $Id: initialize.sh 13 2006-12-15 03:57:00Z mspang $
# Initializes a database for CEO. # Initializes a database for CEO.
# initialize the database # initialize the database

View File

@ -1,4 +1,3 @@
-- $Id: structure.sql 36 2006-12-28 10:00:11Z mspang $
-- Table structure for CEO's SQL database. -- Table structure for CEO's SQL database.
-- Usage: -- Usage:

View File

@ -1,4 +1,3 @@
-- $Id: verify_studentid.sql 7 2006-12-11 06:27:22Z mspang $
-- PL/Python trigger to verify student ids for validity -- PL/Python trigger to verify student ids for validity
-- Dedicated to office staff who can't type student ids. -- Dedicated to office staff who can't type student ids.

View File

@ -1,4 +1,3 @@
-- $Id$
-- PL/Python trigger to verify terms for validity -- PL/Python trigger to verify terms for validity
-- To (re)install: -- To (re)install: