From 58bf72726acb977fc2422e65feb11c1f198de02b Mon Sep 17 00:00:00 2001 From: Michael Spang Date: Sat, 27 Jan 2007 19:23:18 -0500 Subject: [PATCH] New release (version 0.2). Updates in this version: * Tests added to most Python modules. * Split configuration files. * Added maintainer scripts to manage permissions during install and purge. * Added functions for use by tools planned for next release (chfn, etc). ceo: * Added support for account "repair", which will recreate LDAP entries and Kerberos principals if necessary. * The recreate account menu option is now active. Miscellaneous: * Replaced instances of "== None" and "!= None" with "is None" and "is not None", respectively (thanks to: Nick Guenther). * Renamed terms.valid() to terms.validate() (thanks to: Nick Guenther). --- bin/ceo | 16 +- debian/changelog | 15 + debian/control | 4 +- debian/copyright | 4 +- debian/postinst | 58 ++ debian/postrm | 33 + debian/rules | 30 +- docs/BUGS | 6 +- docs/TODO | 8 + etc/accounts.cf | 53 +- etc/kerberos.cf | 5 + etc/ldap.cf | 9 + etc/members.cf | 11 +- etc/pgsql.cf | 11 + pylib/csc/__init__.py | 18 +- pylib/csc/adm/__init__.py | 9 + pylib/csc/adm/accounts.py | 1288 +++++++++++++++++++++++++---- pylib/csc/adm/members.py | 262 ++++-- pylib/csc/adm/terms.py | 54 +- pylib/csc/apps/__init__.py | 7 +- pylib/csc/apps/legacy/__init__.py | 6 +- pylib/csc/apps/legacy/helpers.py | 43 +- pylib/csc/apps/legacy/main.py | 183 ++-- pylib/csc/backends/__init__.py | 11 +- pylib/csc/backends/db.py | 203 ++++- pylib/csc/backends/ipc.py | 49 +- pylib/csc/backends/krb.py | 156 +++- pylib/csc/backends/ldapi.py | 338 +++++--- pylib/csc/common/__init__.py | 4 + pylib/csc/common/conf.py | 122 ++- pylib/csc/common/excep.py | 12 + pylib/csc/common/test.py | 42 + sql/initialize.sh | 1 - sql/structure.sql | 1 - sql/verify_studentid.sql | 1 - sql/verify_term.sql | 1 - 36 files changed, 2365 insertions(+), 709 deletions(-) create mode 100644 debian/postinst create mode 100644 debian/postrm create mode 100644 docs/TODO create mode 100644 etc/kerberos.cf create mode 100644 etc/ldap.cf create mode 100644 etc/pgsql.cf create mode 100644 pylib/csc/common/excep.py create mode 100644 pylib/csc/common/test.py diff --git a/bin/ceo b/bin/ceo index c8847da..3be2dd6 100755 --- a/bin/ceo +++ b/bin/ceo @@ -1,22 +1,20 @@ #!/usr/bin/python2.4 -- - +"""CEO SUID Python Wrapper Script""" import os, sys -safe_environment = ['LOGNAME', 'USERNAME', 'USER', 'HOME', - 'TERM', 'LANG', 'LC_ALL', 'LC_COLLATE', - 'LC_CTYPE', 'LC_MESSAGE', 'LC_MONETARY', - 'LC_NUMERIC', 'LC_TIME', 'UID', 'GID', - 'SSH_CONNECTION', 'SSH_AUTH_SOCK', - 'SSH_CLIENT'] +safe_environment = ['LOGNAME', 'USERNAME', 'USER', 'HOME', 'TERM', 'LANG' + 'LC_ALL', 'LC_COLLATE', 'LC_CTYPE', 'LC_MESSAGE', 'LC_MONETARY', + 'LC_NUMERIC', 'LC_TIME', 'UID', 'GID', 'SSH_CONNECTION', 'SSH_AUTH_SOCK', + 'SSH_CLIENT'] for key in os.environ.keys(): - if not key in safe_environment: + if key not in safe_environment: del os.environ[key] os.environ['PATH'] = '/bin:/usr/bin' for dir in sys.path[:]: - if not dir.find('/usr') == 0 or dir.find('/usr/local') == 0: + if not dir.find('/usr') == 0: while dir in sys.path: sys.path.remove(dir) diff --git a/debian/changelog b/debian/changelog index a5c8c99..dfd1e15 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,18 @@ +csc (0.2) unstable; urgency=low + + * Tests added to most Python modules. + * Split configuration files. + * Added maintainer scripts to manage permissions during install and purge. + * Added functions for use by tools planned for next release (chfn, etc). + * Added support for account "repair", which will recreate LDAP entries + and principals if necessary. + * The recreate account menu option in CEO is now active. + * Replaced instances of "== None" and "!= None" with "is None" and + "is not None", respectively (thanks to: Nick Guenther). + * Renamed terms.valid() to terms.validate() (thanks to: Nick Guenther). + + -- Michael Spang Fri, 26 Jan 2007 20:10:14 -0500 + csc (0.1) unstable; urgency=low * Initial Release. diff --git a/debian/control b/debian/control index 535cace..f5ed12b 100644 --- a/debian/control +++ b/debian/control @@ -3,11 +3,11 @@ Section: admin Priority: optional Maintainer: Michael Spang Build-Depends: debhelper (>= 4.0.0) -Standards-Version: 3.6.1 +Standards-Version: 3.7.2 Package: csc Architecture: any -Depends: python, python2.4, python2.4-ldap, python2.4-pygresql, krb5-user, less +Depends: python, python2.4, python2.4-ldap, python2.4-pygresql, krb5-user, less, ${shlibs:Depends} Description: Computer Science Club Administrative Utilities This package contains the CSC Electronic Office and other Computer Science Club administrative diff --git a/debian/copyright b/debian/copyright index b8c9741..76f0cb7 100644 --- a/debian/copyright +++ b/debian/copyright @@ -1,7 +1,7 @@ -This package was debianized by mspang on +This package was debianized by Michael Spang on Thu, 28 Dec 2006 04:07:03 -0500. -Copyright (c) 2006, 2007 Michael Spang +Copyright (c) 2006-2007, Michael Spang All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/debian/postinst b/debian/postinst new file mode 100644 index 0000000..5611620 --- /dev/null +++ b/debian/postinst @@ -0,0 +1,58 @@ +#!/bin/bash -e + +case "$1" in + configure|upgrade) + + if getent passwd ceo > /dev/null; then + CEO=ceo + SUID=4750 + else + CEO=root + SUID=755 + fi + + if getent group office > /dev/null; then + OFFICE=office + else + OFFICE=root + fi + + if ! dpkg-statoverride --list /usr/bin/ceo > /dev/null; then + dpkg-statoverride --add --update $CEO $OFFICE $SUID /usr/bin/ceo + fi + + if [ -f /etc/csc/ldap.cf ] && ! dpkg-statoverride --list /etc/csc/ldap.cf > /dev/null; then + dpkg-statoverride --add --update $CEO staff 640 /etc/csc/ldap.cf + fi + + if [ ! -e /etc/csc/ceo.keytab ] && [ -x /usr/sbin/kadmin.local ]; then + if dpkg-statoverride --list /etc/csc/ceo.keytab > /dev/null; then + dpkg-statoverride --remove /etc/csc/ceo.keytab || true + fi + echo 'warning: re-creating ceo.keytab' + echo 'ktadd -k /etc/csc/ceo.keytab ceo/admin' | /usr/sbin/kadmin.local || true + if [ -e /etc/csc/ceo.keytab ]; then + echo -e "\nSuccess!" + else + echo -e "\nFailed!" + fi + fi + + if [ -f /etc/csc/ceo.keytab ] && ! dpkg-statoverride --list /etc/csc/ceo.keytab > /dev/null; then + dpkg-statoverride --add --update $CEO staff 640 /etc/csc/ceo.keytab + fi + + ;; + + abort-upgrade|abort-remove|abort-deconfigure) + ;; + + *) + echo "postinst called with unknown argument \"$1\"" >&2 + exit 1 + ;; +esac + +#DEBHELPER# + +exit 0 diff --git a/debian/postrm b/debian/postrm new file mode 100644 index 0000000..3908d29 --- /dev/null +++ b/debian/postrm @@ -0,0 +1,33 @@ +#!/bin/bash -e + +case "$1" in + purge) + + if dpkg-statoverride --list /usr/bin/ceo > /dev/null; then + dpkg-statoverride --remove /usr/bin/ceo || true + fi + + if dpkg-statoverride --list /etc/csc/ldap.cf > /dev/null; then + dpkg-statoverride --remove /etc/csc/ldap.cf || true + fi + + if dpkg-statoverride --list /etc/csc/ceo.keytab > /dev/null; then + dpkg-statoverride --remove /etc/csc/ceo.keytab || true + fi + + rmdir --ignore-fail-on-non-empty /etc/csc + + ;; + + remove|failed-upgrade|upgrade) + ;; + + *) + echo "postrm called with invalid argument \"$1\"" >&2 + exit 1 + ;; +esac + +#DEBHELPER# + +exit 0 diff --git a/debian/rules b/debian/rules index 2009af7..443ce29 100755 --- a/debian/rules +++ b/debian/rules @@ -2,13 +2,11 @@ PYTHON := python2.4 -configure: - build: build-stamp build-stamp: mkdir build - $(CC) -DFULL_PATH=\"/usr/lib/csc/ceo\" -o build/ceo misc/setuid-prog.c + $(CC) -DFULL_PATH='"/usr/lib/csc/ceo"' -o build/ceo misc/setuid-prog.c touch build-stamp clean: @@ -17,27 +15,22 @@ clean: dh_clean rm -f build-stamp rm -rf build/ - find pylib/ -name '*.pyc' -print0 | xargs -0 rm -f + find pylib/ -name "*.pyc" -print0 | xargs -0 rm -f install: build dh_testdir dh_testroot - dh_clean -k - # configuration files will contain sensitive information - chmod 600 etc/* - dh_installdirs etc/csc usr/lib/$(PYTHON)/site-packages usr/share/csc \ usr/lib/csc usr/bin - dh_install -X.svn -X.pyc pylib/csc usr/lib/$(PYTHON)/site-packages/ - dh_install -X.svn -X.pyc etc/* etc/csc/ - dh_install -X.svn -X.pyc sql/* usr/share/csc/ + dh_install pylib/* usr/lib/$(PYTHON)/site-packages/ + dh_install etc/* etc/csc/ + dh_install sql/* usr/share/csc/ - dh_install -X.svn -X.pyc bin/ceo usr/lib/csc/ - dh_install -X.svn -X.pyc build/ceo usr/bin/ + dh_install bin/ceo usr/lib/csc/ + dh_install build/ceo usr/bin/ - -binary-indep: build install +binary-arch: build install dh_testdir dh_testroot dh_installchangelogs @@ -60,7 +53,8 @@ binary-indep: build install dh_md5sums dh_builddeb -binary: binary-indep binary-arch -.PHONY: build clean binary-indep binary-arch binary install configure +binary-indep: -binary-arch: build install +binary: binary-indep binary-arch + +.PHONY: build clean binary-indep binary-arch binary install diff --git a/docs/BUGS b/docs/BUGS index 442acd9..edd2867 100644 --- a/docs/BUGS +++ b/docs/BUGS @@ -3,6 +3,6 @@ Bugs and Caveats ================ CEO: - - curses does not draw borders/lines correctly in a screen session - - windows don't always clear properly - - the menu is not redrawn between windows and therefore a gap may grow there + - curses does not draw borders/lines correctly in a screen session. screen apparently ignores + some font-changing characters. workaround should be possible (other progs work). + - the menu is not redrawn between windows and therefore a gap tends to grow there diff --git a/docs/TODO b/docs/TODO new file mode 100644 index 0000000..f634051 --- /dev/null +++ b/docs/TODO @@ -0,0 +1,8 @@ +TODO: + +* Python bindings for libkadm5 +* Python bindings for quota? +* New UI: urwid-based? +* Logging via syslog +* Try to recover and roll-back on error during account creation +* Write manpages diff --git a/etc/accounts.cf b/etc/accounts.cf index cfe64ed..553f947 100644 --- a/etc/accounts.cf +++ b/etc/accounts.cf @@ -1,35 +1,44 @@ -# $Id: accounts.cf 45 2007-01-02 01:39:10Z mspang $ -# CSC Accounts Configuration +# /etc/csc/accounts.cf: CSC Accounts Configuration -### Account Options ### +include /etc/csc/ldap.cf +include /etc/csc/kerberos.cf -minimum_id = 20000 -maximum_id = 40000 +### Member Account Options ### -shell = "/bin/bash" -home = "/users" -gid = 100 +member_min_id = 20000 +member_max_id = 39999 +member_shell = "/bin/bash" +member_home = "/users" +member_desc = "CSC Member Account" +member_group = "users" +### Club Account Options ### -### LDAP Configuration ### +club_min_id = 15000 +club_max_id = 19999 +club_shell = "/bin/bash" +club_home = "/users" +club_desc = "CSC Club Account" +club_group = "users" -server_url = "ldap:///" +### Administrative Account Options -users_base = "ou=People,dc=csclub,dc=uwaterloo,dc=ca" -groups_base = "ou=Group,dc=csclub,dc=uwaterloo,dc=ca" +admin_min_id = 10000 +admin_max_id = 14999 +admin_shell = "/bin/bash" +admin_home = "/users" +admin_desc = "CSC Administrative Account" +admin_group = "users" -bind_dn = "cn=ceo,dc=csclub,dc=uwaterloo,dc=ca" -bind_password = "secret" - - -### Kerberos Configuration ### - -realm = "CSCLUB.UWATERLOO.CA" -principal = "ceo/admin@CSCLUB.UWATERLOO.CA" -keytab = "/etc/csc/ceo.keytab" +### Account Group Options ### +group_min_id = 10000 +group_max_id = 14999 +group_desc = "CSC Group" ### Validation Tuning ### username_regex = "^[a-z][-a-z0-9]*$" -realname_regex = "^[^,:=]*$" +groupname_regex = "^[a-z][-a-z0-9]*$" +min_password_length = 4 +shells_file = "/etc/shells" diff --git a/etc/kerberos.cf b/etc/kerberos.cf new file mode 100644 index 0000000..19ad876 --- /dev/null +++ b/etc/kerberos.cf @@ -0,0 +1,5 @@ +# /etc/csc/kerberos.cf: CSC Kerberos Administration Configuration + +realm = "CSCLUB.UWATERLOO.CA" +admin_principal = "ceo/admin@CSCLUB.UWATERLOO.CA" +admin_keytab = "/etc/csc/ceo.keytab" diff --git a/etc/ldap.cf b/etc/ldap.cf new file mode 100644 index 0000000..c32724c --- /dev/null +++ b/etc/ldap.cf @@ -0,0 +1,9 @@ +# /etc/csc/ldap.cf: CSC LDAP Configuration + +server_url = "ldaps:///" + +users_base = "ou=People,dc=csclub,dc=uwaterloo,dc=ca" +groups_base = "ou=Group,dc=csclub,dc=uwaterloo,dc=ca" + +admin_bind_dn = "cn=ceo,dc=csclub,dc=uwaterloo,dc=ca" +admin_bind_pw = "secret" diff --git a/etc/members.cf b/etc/members.cf index e984294..0137622 100644 --- a/etc/members.cf +++ b/etc/members.cf @@ -1,13 +1,6 @@ -# $Id: members.cf 45 2007-01-02 01:39:10Z mspang $ -# CSC Members Configuration +# /etc/csc/members.cf: CSC Members Configuration -### Database Configuration ### - -server = "localhost" -database = "ceo" - -user = "ceo" -password = "secret" +include /etc/csc/pgsql.cf ### Validation Tuning ### diff --git a/etc/pgsql.cf b/etc/pgsql.cf new file mode 100644 index 0000000..021b962 --- /dev/null +++ b/etc/pgsql.cf @@ -0,0 +1,11 @@ +# /etc/csc/pgsql.cf: PostgreSQL database configuration + +### Database Configuration ### + +# server = "localhost" +server = "" +database = "ceo" + +# not used +user = "ceo" +password = "secret" diff --git a/pylib/csc/__init__.py b/pylib/csc/__init__.py index 0583735..dfdefbd 100644 --- a/pylib/csc/__init__.py +++ b/pylib/csc/__init__.py @@ -1,19 +1,5 @@ -# $Id: __init__.py 24 2006-12-18 20:23:12Z mspang $ """ -PyCSC - CSC Administrative Utilities - -Member Management: - - ceo - legacy ceo interface - -Account Management: - - ceo - legacy ceo interface - -Modules: - - admin - administrative code (member and account management) - backend - backend interface code - ui - user interface code +Computer Science Club Python Modules +The csc module is a container for all CSC-specific Python modules. """ diff --git a/pylib/csc/adm/__init__.py b/pylib/csc/adm/__init__.py index e69de29..99f59e0 100644 --- a/pylib/csc/adm/__init__.py +++ b/pylib/csc/adm/__init__.py @@ -0,0 +1,9 @@ +""" +CSC Administrative Modules + +This module provides member and account management modules. + + members - member registration management functions + accounts - account administration functions + terms - helper routines for manipulating terms +""" diff --git a/pylib/csc/adm/accounts.py b/pylib/csc/adm/accounts.py index 3586461..d0b2c87 100644 --- a/pylib/csc/adm/accounts.py +++ b/pylib/csc/adm/accounts.py @@ -1,218 +1,952 @@ -# $Id: accounts.py 44 2006-12-31 07:09:27Z mspang $ -# UNIX Accounts Module -import re +""" +UNIX Accounts Administration + +This module contains functions for creating, deleting, and manipulating +UNIX user accounts and account groups in the CSC LDAP directory. +""" +import re, pwd, grp, os +from csc.common import conf +from csc.common.excep import InvalidArgument from csc.backends import ldapi, krb -from csc.common.conf import read_config + + +### Configuration ### CONFIG_FILE = '/etc/csc/accounts.cf' cfg = {} -# error constants -SUCCESS = 0 -LDAP_EXISTS = 1 -LDAP_NO_IDS = 2 -LDAP_NO_USER = 3 -KRB_EXISTS = 5 -KRB_NO_USER = 6 -BAD_USERNAME = 8 -BAD_REALNAME = 9 - -# error messages -errors = [ "Success", "LDAP: entry exists", - "LDAP: no user ids available", "LDAP: no such entry", - "KRB: principal exists", "KRB: no such principal", - "Invalid username", "Invalid real name"] - - -class AccountException(Exception): - """Exception class for account-related errors.""" - - -def load_configuration(): - """Load Accounts Configuration.""" +def configure(): + """Helper to load the accounts configuration. You need not call this.""" - # configuration already loaded? - if len(cfg) > 0: - return - - # read in the file - cfg_tmp = read_config(CONFIG_FILE) + string_fields = [ 'member_shell', 'member_home', 'member_desc', + 'member_group', 'club_shell', 'club_home', 'club_desc', + 'club_group', 'admin_shell', 'admin_home', 'admin_desc', + 'admin_group', 'group_desc', 'username_regex', 'groupname_regex', + 'shells_file', 'server_url', 'users_base', 'groups_base', + 'admin_bind_dn', 'admin_bind_pw', 'realm', 'admin_principal', + 'admin_keytab' ] + numeric_fields = [ 'member_min_id', 'member_max_id', 'club_min_id', + 'club_max_id', 'admin_min_id', 'admin_max_id', 'group_min_id', + 'group_max_id', 'min_password_length' ] - if not cfg_tmp: - raise AccountException("unable to read configuration file: %s" % CONFIG_FILE) + # read configuration file + cfg_tmp = conf.read(CONFIG_FILE) - # check that essential fields are completed - mandatory_fields = [ 'minimum_id', 'maximum_id', 'shell', 'home', - 'gid', 'server_url', 'users_base', 'groups_base', 'bind_dn', - 'bind_password', 'realm', 'principal', 'keytab', 'username_regex', - 'realname_regex' - ] - - for field in mandatory_fields: - if not field in cfg_tmp: - raise AccountException("missing configuration option: %s" % field) - if not cfg_tmp[field]: - raise AccountException("null configuration option: %s" % field) - - # check that numeric fields are ints - numeric_fields = [ 'minimum_id', 'maximum_id', 'gid' ] - - for field in numeric_fields: - if not type(cfg_tmp[field]) in (int, long): - raise AccountException("non-numeric value for configuration option: %s" % field) + # verify configuration (not necessary, but prints a useful error) + conf.check_string_fields(CONFIG_FILE, string_fields, cfg_tmp) + conf.check_integer_fields(CONFIG_FILE, numeric_fields, cfg_tmp) # update the current configuration with the loaded values cfg.update(cfg_tmp) - -def create_account(username, password, realname='', gecos_other=''): + + +### Exceptions ### + +KrbException = krb.KrbException +LDAPException = ldapi.LDAPException +ConfigurationException = conf.ConfigurationException + +class AccountException(Exception): + """Base exception class for account-related errors.""" + +class NoAvailableIDs(AccountException): + """Exception class for exhausted userid ranges.""" + def __init__(self, minid, maxid): + self.minid, self.maxid = minid, maxid + def __str__(self): + return "No free ID pairs found in range [%d, %d]" % (self.minid, self.maxid) + +class NameConflict(AccountException): + """Exception class for name conflicts with existing accounts/groups.""" + def __init__(self, name, nametype, source): + self.name, self.nametype, self.source = name, nametype, source + def __str__(self): + return 'Name Conflict: %s "%s" already exists in %s' % (self.nametype, self.name, self.source) + +class NoSuchAccount(AccountException): + """Exception class for missing LDAP entries for accounts.""" + def __init__(self, account, source): + self.account, self.source = account, source + def __str__(self): + return 'Account "%s" not found in %s' % (self.account, self.source) + +class NoSuchGroup(AccountException): + """Exception class for missing LDAP entries for groups.""" + def __init__(self, account, source): + self.account, self.source = account, source + def __str__(self): + return 'Account "%s" not found in %s' % (self.account, self.source) + + + +### Connection Management ### + +ldap_connection = ldapi.LDAPConnection() +krb_connection = krb.KrbConnection() + +def connect(): + """Connect to LDAP and Kerberos and load configuration. You must call before anything else.""" + + configure() + + # connect to the LDAP server + ldap_connection.connect(cfg['server_url'], cfg['admin_bind_dn'], cfg['admin_bind_pw'], cfg['users_base'], cfg['groups_base']) + + # connect to the Kerberos master server + krb_connection.connect(cfg['admin_principal'], cfg['admin_keytab']) + + +def disconnect(): + """Disconnect from LDAP and Kerberos. Call this before quitting.""" + + ldap_connection.disconnect() + krb_connection.disconnect() + + +def connected(): + """Determine whether a connection has been established.""" + + return ldap_connection.connected() and krb_connection.connected() + + + +### General Account Management ### + +def create(username, name, minimum_id, maximum_id, home, password=None, description='', gecos='', shell=None, group=None): """ - Creates a UNIX account for a member. This involves - first creating a directory entry, then creating - a Kerberos principal. + Creates a UNIX user account. This involves first creating an LDAP + directory entry, then creating a Kerberos principal. + + The UID/GID namespace may be divided into ranges according to account type + or purpose. This function requires such a range to allocate ids from. + + If no password is specified or password is None, no Kerberos principal + will be created and the account will not be capable of direct login. + This is desirable for administrative and club accounts. + + If no group is specified, a new group will be created with the same name + as the user. The uid of the created user and gid of the created group + will be numerically equal. There is generally no reason to specify a + group. Furthermore, only groups present in the directory are allowed. + + If an account is relevant to only one system and will not own files on + NFS, please use adduser(8) on the relevant system instead. + + Generally do not directly use this function. The create_member(), + create_club(), and create_adm() functions will fill in most of + the details for you and may do additional checks. Parameters: - username - UNIX username for the member - realname - real name of the member - password - password for the account + username - UNIX username for the account + name - common name LDAP attribute + minimum_id - the smallest UID/GID to assign + maximum_id - the largest UID/GID to assign + home - home directory LDAP attribute + password - password for the account + description - description LDAP attribute + gecos - gecos LDAP attribute + shell - user shell LDAP attribute + group - primary group for account Exceptions: - LDAPException - on LDAP failure - KrbException - on Kerberos failure - - Returns: - SUCCESS - on success - BAD_REALNAME - on badly formed real name - BAD_USERNAME - on badly formed user name - LDAP_EXISTS - when the user exists in LDAP - LDAP_NO_IDS - when no user ids are free - KRB_EXISTS - when the user exists in Kerberos + NameConflict - when the name conflicts with an existing account + NoSuchGroup - when the group parameter corresponds to no group + NoAvailableIDs - when the ID range is exhausted + AccountException - when not connected + + Returns: the uid number of the new account + + Example: create('mspang', 'Michael Spang', 20000, 39999, + '/users/mspang', 'secret', 'CSC Member Account', + build_gecos('Michael Spang', other='3349'), + '/bin/bash', 'users') """ + + # check connection + if not connected(): + raise AccountException("Not connected to LDAP and Kerberos") - # Load Configuration - load_configuration() + # check for path characters in username (. and /) + if re.search('[\\./]', username): + raise InvalidArgument("username", username, "invalid characters") - ### Connect to the Backends ### + check_name_usage(username) - ldap_connection = ldapi.LDAPConnection() - krb_connection = krb.KrbConnection() + # determine the first available userid + userid = first_available_id(minimum_id, maximum_id) + if not userid: + raise NoAvailableIDs(minimum_id, maximum_id) - try: + # determine the account's default group + if group: + group_data = ldap_connection.group_lookup(group) + if not group_data: + raise NoSuchGroup(group, "LDAP") + gid = int(group_data['gidNumber'][0]) + else: + gid = userid - # connect to the LDAP server - ldap_connection.connect(cfg['server_url'], cfg['bind_dn'], cfg['bind_password'], cfg['users_base'], cfg['groups_base']) + ### User creation ### - # connect to the Kerberos master server - krb_connection.connect(cfg['principal'], cfg['keytab']) + # create the LDAP entry + ldap_connection.user_add(username, name, userid, gid, home, shell, gecos, description) - ### Sanity-checks ### - - # check the username and realame for validity - if not re.match(cfg['username_regex'], username): - return BAD_USERNAME - if not re.match(cfg['realname_regex'], realname): - return BAD_REALNAME + # create a user group if no other group was specified + if not group: + ldap_connection.group_add(username, gid) - # see if user exists in LDAP - if ldap_connection.user_lookup(username): - return LDAP_EXISTS - - # determine the first available userid - userid = ldap_connection.first_id(cfg['minimum_id'], cfg['maximum_id']) - if not userid: return LDAP_NO_IDS - - # build principal name from username + # create the Kerberos principal + if password: principal = username + '@' + cfg['realm'] - - # see if user exists in Kerberos - if krb_connection.get_principal(principal): - return KRB_EXISTS - - ### User creation ### - - # process gecos_other (used to store memberid) - if gecos_other: - gecos_other = ',' + str(gecos_other) - - # account information defaults - shell = cfg['shell'] - home = cfg['home'] + '/' + username - gecos = realname + ',,,' + gecos_other - gid = cfg['gid'] - - # create the LDAP entry - ldap_connection.user_add(username, realname, shell, userid, gid, home, gecos) - - # create the Kerberos principal krb_connection.add_principal(principal, password) - finally: - ldap_connection.disconnect() - krb_connection.disconnect() - - return SUCCESS - + return userid -def delete_account(username): + +def delete(username): """ - Deletes the UNIX account of a member. - - Parameters: - username - UNIX username for the member + Deletes a UNIX account. Both LDAP entries and Kerberos principals that + match username are deleted. A group with the same name is deleted too, + if it exists and has the same id as the account. - Exceptions: - LDAPException - on LDAP failure - KrbException - on Kerberos failure - - Returns: - SUCCESS - on success - LDAP_NO_USER - when the user does not exist in LDAP - KRB_NO_USER - when the user does not exist in Kerberos + Returns: tuple with deleted LDAP and Kerberos information + note: the Kerberos keys are not recoverable """ - # Load Configuration - load_configuration() + # check connection + if not connected(): + raise AccountException("Not connected to LDAP and Kerberos") - ### Connect to the Backends ### + # build principal name from username + principal = username + '@' + cfg['realm'] - ldap_connection = ldapi.LDAPConnection() - krb_connection = krb.KrbConnection() + # get account state + ldap_state = ldap_connection.user_lookup(username) + krb_state = krb_connection.get_principal(principal) + group_state = ldap_connection.group_lookup(username) - try: - - # connect to the LDAP server - ldap_connection.connect(cfg['server_url'], cfg['bind_dn'], cfg['bind_password'], cfg['users_base'], cfg['groups_base']) + # don't delete group unless the gid matches the account's uid + if not ldap_state or group_state and ldap_state['uidNumber'][0] != group_state['gidNumber'][0]: + group_state = None - # connect to the Kerberos master server - krb_connection.connect(cfg['principal'], cfg['keytab']) + # fail if no data is found in either LDAP or Kerberos + if not ldap_state and not krb_state: + raise NoSuchAccount(username, "LDAP/Kerberos") - ### Sanity-checks ### - - # ensure user exists in LDAP - if not ldap_connection.user_lookup(username): - return LDAP_NO_USER - - # build principal name from username - principal = username + '@' + cfg['realm'] + ### User deletion ### - # see if user exists in Kerberos - if not krb_connection.get_principal(principal): - return KRB_NO_USER - - ### User deletion ### - - # delete the LDAP entry + # delete the LDAP entries + if ldap_state: ldap_connection.user_delete(username) - - # delete the Kerberos principal + if group_state: + ldap_connection.group_delete(username) + + # delete the Kerberos principal + if krb_state: krb_connection.delete_principal(principal) - finally: - ldap_connection.disconnect() - krb_connection.disconnect() + return ldap_state, group_state, krb_state + + +def status(username): + """ + Checks if an account exists. + + Returns: a boolean 2-tuple (exists, has_password) + """ + + ldap_state = ldap_connection.user_lookup(username) + krb_state = krb_connection.get_principal(username) + return (ldap_state is not None, krb_state is not None) + + +def add_password(username, password): + """ + Creates a principal for an existing, passwordless account. + + Parameters: + username - a UNIX account username + password - a password for the acccount + """ + check_account_status(username) + ldap_state = ldap_connection.user_lookup(username) + if int(ldap_state['uidNumber'][0]) < 1000: + raise AccountException("Attempted to add password to a system account") + krb_connection.add_principal(username, password) + + +def reset_password(username, newpassword): + """ + Changes a user's password. + + Parameters: + username - a UNIX account username + newpassword - a new password for the account + """ + check_account_status(username, require_krb=True) + krb_connection.change_password(username, newpassword) + + +def get_uid(username): + """ + Determine the numeric uid of an account. + + Returns: a uid as an int + """ + check_account_status(username) + account_data = ldap_connection.user_lookup(username) + return int(account_data['uidNumber'][0]) + + +def get_gid(username): + """ + Determine the numeric gid of an account (default group). + + Returns: a gid as an int + """ + check_account_status(username) + account_data = ldap_connection.user_lookup(username) + return int(account_data['gidNumber'][0]) + + +def get_gecos(username, account_data=None): + """ + Retrieve GECOS information of a user. + + Returns: raw gecos data as a string, or None + """ + check_account_status(username) + if not account_data: + account_data = ldap_connection.user_lookup(username) + if 'gecos' in account_data: + return account_data['gecos'][0] + else: + return None - return SUCCESS + +def update_gecos(username, gecos_data): + """ + Set GECOS information for a user. The LDAP 'cn' attribute + is also updated with the user's full name. + + See build_gecos() and parse_gecos() for help dealing with + the chfn(1) GEOCS format. + + Use update_name() to update the name porition, as it will update + the LDAP 'cn' atribute as well. + + Parameters: + username - a UNIX account username + gecos_data - a raw gecos string + + Example: update_gecos('mspang', build_gecos('Mike Spang')) + """ + check_account_status(username) + entry = ldap_connection.user_lookup(username) + entry['gecos'] = [ gecos_data ] + ldap_connection.user_modify(username, entry) + + +def get_name(username): + """ + Get the real name of a user. Note that this name is usually stored + in both the 'cn' attribute and the 'gecos' attribute, and they + may differ. This function will always return the first in the'cn' + version. If there are multiple, the first in the list is returned. + + Returns: the common name associated with the account + """ + check_account_status(username) + account_data = ldap_connection.user_lookup(username) + return account_data['cn'][0] + + +def update_name(username, name, update_gecos=True): + """ + Set the real name of a user. This name will be updated in both + the GECOS field and the common name field. If there are multiple + common names, they will *all* be overwritten with the provided name. + + Parameters: + username - the UNIX account usernmae + nane - new real name for the account + update_gecos - whether to update gecos field + """ + check_account_status(username) + account_data = ldap_connection.user_lookup(username) + account_data['cn'] = [ name ] + if update_gecos: + gecos_dict = parse_gecos(get_gecos(username, account_data)) + gecos_dict['fullname'] = name + account_data['gecos'] = [ build_gecos(**gecos_dict) ] + ldap_connection.user_modify(username, account_data) + + +def get_shell(username): + """ + Retrieve a user's shell. + + Returns: the path to the shell, or None + """ + check_account_status(username) + account_data = ldap_connection.user_lookup(username) + if 'loginShell' not in account_data or len(account_data['loginShell']) < 1: + return None + return account_data['loginShell'][0] + + +def update_shell(username, shell, check=True): + """ + Set a user's shell. + + Parameters: + username - the UNIX account username + shell - the new shell for the user + check - whether to check if the shell is in the shells file + + Exceptions: + InvalidArgument - on nonexistent shell + """ + + # reject nonexistent or nonexecutable shells + if not os.access(shell, os.X_OK) or not os.path.isfile(shell): + raise InvalidArgument("shell", shell, "is not a regular executable file") + + if check: + + # load shells file + shells = open(cfg['shells_file']).read().split("\n") + shells = [ x for x in shells if x and x[0] == '/' and '#' not in x ] + + # reject shells that aren't in the shells file (usually /etc/shells) + if check and shell not in shells: + raise InvalidArgument("shell", shell, "is not in %s" % cfg['shells_file']) + + check_account_status(username) + account_data = ldap_connection.user_lookup(username) + account_data['loginShell'] = [ shell ] + ldap_connection.user_modify(username, account_data) + + +def get_home(username): + """ + Get the home directory of a user. + + Returns: path to the user's home directory + """ + check_account_status(username) + account_data = ldap_connection.user_lookup(username) + return account_data['homeDirectory'][0] + + +def update_home(username, home): + """ + Set the home directory of a user. + + Parameters: + username - the UNIX account username + home - new home directory for the user + """ + check_account_status(username) + if not home[0] == '/': + raise InvalidArgument('home', home, 'relative path') + account_data = ldap_connection.user_lookup(username) + account_data['homeDirectory'] = [ home ] + ldap_connection.user_modify(username, account_data) + + + +### General Group Management ### + +def create_group(groupname, minimum_id=None, maximum_id=None, description=''): + """ + Creates a UNIX group. This involves adding an entry to LDAP. + + The UID/GID namespace may be divided into ranges according to group + type or purpose. This function accept such a range to allocate ids from. + If none is specified, it will use the default from the configuration file. + + If a group needs directory accounts as members, or if the group will + own files on NFS, you must add it to the directory with this function. + + If a group is relevant to only a single system and does not need any + directory accounts as members, create it with the addgroup(8) utility + for just that system instead. + + If you do not specify description, the default will be used. If no + description at all is wanted, set description to None. + + Parameters: + groupname - UNIX group name + minimum_id - the smallest GID to assign + maximum_id - the largest GID to assign + description - description LDAP attribute + + Exceptions: + GroupExists - when the group name conflicts with an existing group + NoAvailableIDs - when the ID range is exhausted + GroupException - when not connected + LDAPException - on LDAP failure + + Returns: the gid number of the new group + + Example: create_group('ninjas', 10000, 14999) + """ + + # check connection + if not connected(): + raise AccountException("Not connected to LDAP and Kerberos") + + # check groupname format + if not groupname or not re.match(cfg['groupname_regex'], groupname): + raise InvalidArgument("groupname", groupname, "expected format %s" % repr(cfg['groupname_regex'])) + + # load defaults for unspecified parameters + if not minimum_id and maximum_id: + minimum_id = cfg['group_min_id'] + maximum_id = cfg['group_max_id'] + if description == '': + description = cfg['group_desc'] + + check_name_usage(groupname) + + # determine the first available groupid + groupid = first_available_id(cfg['group_min_id'], cfg['group_max_id']) + if not groupid: + raise NoAvailableIDs(minimum_id, maximum_id) + + ### Group creation ### + + # create the LDAP entry + ldap_connection.group_add(groupname, groupid, description) + + return groupid + + +def delete_group(groupname): + """ + Deletes a group. + + Returns: the deleted LDAP information + """ + + # check connection + if not connected(): + raise AccountException("Not connected to LDAP") + + # get account state + ldap_state = ldap_connection.group_lookup(groupname) + + # fail if no data is found in either LDAP or Kerberos + if not ldap_state: + raise NoSuchGroup(groupname, "LDAP") + + ### Group deletion ### + + # delete the LDAP entry + if ldap_state: + ldap_connection.group_delete(groupname) + + return ldap_state + + +def check_membership(username, groupname): + """ + Determines whether an account is a member of a group + by checking the group's member list and the user's + default group. + + Returns: True if username is a member of groupname + """ + + check_account_status(username) + check_group_status(groupname) + + group_data = ldap_connection.group_lookup(groupname) + user_data = ldap_connection.user_lookup(username) + + group_members = get_members(groupname, group_data) + group_id = int(group_data['gidNumber'][0]) + user_group = int(user_data['gidNumber'][0]) + + return username in group_members or group_id == user_group + + +def get_members(groupname, group_data=None): + """ + Retrieve a list of members of a group. This list + will not include accounts that are members because + their gidNumber attribute matches the group's. + + Parameters: + group_data - result of a previous LDAP lookup on groupname (internal) + + Returns: a list of usernames + """ + + check_group_status(groupname) + + if not group_data: + group_data = ldap_connection.group_lookup(groupname) + + if 'memberUid' in group_data: + group_members = group_data['memberUid'] + else: + group_members = [] + + return group_members + + +def add_member(username, groupname): + """ + Add an account to the list of group members. + + Returns: False if the user was already a member, else True + """ + + check_account_status(username) + check_group_status(groupname) + + group_data = ldap_connection.group_lookup(groupname) + group_members = get_members(groupname, group_data) + + if groupname in group_members: + return False + + group_members.append(username) + group_data['memberUid'] = group_members + ldap_connection.group_modify(groupname, group_data) + + return True + + +def remove_member(username, groupname): + """ + Removes an account from the list of group members. + + Returns: True if the user was a member, else False + """ + + check_account_status(username) + check_group_status(groupname) + + group_data = ldap_connection.group_lookup(groupname) + group_members = get_members(groupname, group_data) + + if username not in group_members: + return False + + while username in group_members: + group_members.remove(username) + + group_data['memberUid'] = group_members + ldap_connection.group_modify(groupname, group_data) + + return True + + +### Account Types ### + +def create_member(username, password, name, memberid): + """ + Creates a UNIX user account with options tailored to CSC members. + + Note: The 'other' section of the GECOS field is filled with the CSC + memberid. This section cannot be changed by the user via chfn(1). + + Parameters: + username - the desired UNIX username + password - the desired UNIX password + name - the member's real name + memberid - the CSC member id number + + Exceptions: + InvalidArgument - on bad account attributes provided + + Returns: the uid number of the new account + + See: create() + """ + + # check connection + if not connected(): + raise AccountException("not connected to LDAP and Kerberos") + + # check username format + if not username or not re.match(cfg['username_regex'], username): + raise InvalidArgument("username", username, "expected format %s" % repr(cfg['username_regex'])) + + # check password length + if not password or len(password) < cfg['min_password_length']: + raise InvalidArgument("password", "", "too short (minimum %d characters)" % cfg['min_password_length']) + + minimum_id = cfg['member_min_id'] + maximum_id = cfg['member_max_id'] + home = cfg['member_home'] + '/' + username + description = cfg['member_desc'] + gecos_field = build_gecos(name, other=memberid) + shell = cfg['member_shell'] + group = cfg['member_group'] + + return create(username, name, minimum_id, maximum_id, home, password, description, gecos_field, shell, group) + + +def create_club(username, name, memberid): + """ + Creates a UNIX user account with options tailored to CSC-hosted clubs. + + Note: The 'other' section of the GECOS field is filled with the CSC + memberid. This section cannot be changed by the user via chfn(1). + + Parameters: + username - the desired UNIX username + name - the club name + memberid - the CSC member id number + + Exceptions: + InvalidArgument - on bad account attributes provided + + Returns: the uid number of the new account + + See: create() + """ + + # check connection + if not connected(): + raise AccountException("not connected to LDAP and Kerberos") + + # check username format + if not username or not re.match(cfg['username_regex'], username): + raise InvalidArgument("username", username, "expected format %s" % repr(cfg['username_regex'])) + + password = None + minimum_id = cfg['club_min_id'] + maximum_id = cfg['club_max_id'] + home = cfg['club_home'] + '/' + username + description = cfg['club_desc'] + gecos_field = build_gecos(name, other=memberid) + shell = cfg['club_shell'] + group = cfg['club_group'] + + return create(username, name, minimum_id, maximum_id, home, password, description, gecos_field, shell, group) + + +def create_adm(username, name): + """ + Creates a UNIX user account with options tailored to long-lived + administrative accounts (e.g. vp, www, sysadmin, etc). + + Parameters: + username - the desired UNIX username + name - a descriptive name or purpose + + Exceptions: + InvalidArgument - on bad account attributes provided + + Returns: the uid number of the new account + + See: create() + """ + + # check connection + if not connected(): + raise AccountException("not connected to LDAP and Kerberos") + + # check username format + if not username or not re.match(cfg['username_regex'], username): + raise InvalidArgument("username", username, "expected format %s" % repr(cfg['username_regex'])) + + password = None + minimum_id = cfg['admin_min_id'] + maximum_id = cfg['admin_max_id'] + home = cfg['admin_home'] + '/' + username + description = cfg['admin_desc'] + gecos_field = build_gecos(name) + shell = cfg['admin_shell'] + group = cfg['admin_group'] + + return create(username, name, minimum_id, maximum_id, home, password, description, gecos_field, shell, group) + + + +### Miscellaneous Helpers ### + +def check_name_usage(name): + """ + Helper function: Ensures a user or group name does not exist in either + Kerberos, LDAP, or through calls to libc and NSS. This is used prior to + creating an accout or group to determine if the name is free. + + Parameters: + name - the user or group name to check for + + Exceptions: + NameConflict - if the name was found anywhere + """ + + # see if user exists in LDAP + if ldap_connection.user_lookup(name): + raise NameConflict(name, "account", "LDAP") + + # see if group exists in LDAP + if ldap_connection.group_lookup(name): + raise NameConflict(name, "group", "LDAP") + + # see if user exists in Kerberos + principal = name + '@' + cfg['realm'] + if krb_connection.get_principal(principal): + raise NameConflict(name, "account", "KRB") + + # see if user exists by getpwnam(3) + try: + pwd.getpwnam(name) + raise NameConflict(name, "account", "NSS") + except KeyError: + pass + + # see if group exists by getgrnam(3) + try: + grp.getgrnam(name) + raise NameConflict(name, "group", "NSS") + except KeyError: + pass + + +def check_account_status(username, require_ldap=True, require_krb=False): + """Helper function to verify that an account exists.""" + + if not connected(): + raise AccountException("Not connected to LDAP and Kerberos") + if require_ldap and not ldap_connection.user_lookup(username): + raise NoSuchAccount(username, "LDAP") + if require_krb and not krb_connection.get_principal(username): + raise NoSuchAccount(username, "KRB") + + +def check_group_status(groupname): + """Helper function to verify that a group exists.""" + + if not connected(): + raise AccountException("Not connected to LDAP and Kerberos") + if not ldap_connection.group_lookup(groupname): + raise NoSuchGroup(groupname, "LDAP") + + +def parse_gecos(gecos_data): + """ + Build a dictionary out of a chfn(1) style GECOS string. + + Parameters: + gecos_data - a gecos string formatted by chfn(1) + + Returns: a dictinoary of components + + Example: parse_gecos('Michael Spang,,,') -> { + 'fullname': 'Michael Spang', + 'roomnumber': '', + 'workphone': '', + 'homephone': '', + 'other': None + } + """ + + # silently remove erroneous colons + while ':' in gecos_data: + index = gecos_data.find(':') + gecos_data = gecos_data[:index] + gecos_data[index+1:] + + gecos_vals = gecos_data.split(',', 4) + gecos_vals.extend([ None ] * (5-len(gecos_vals))) + gecos_keys = ['fullname', 'roomnumber', 'workphone', + 'homephone', 'other' ] + return dict((gecos_keys[i], gecos_vals[i]) for i in xrange(5)) + + +def build_gecos(fullname=None, roomnumber=None, workphone=None, homephone=None, other=None): + """ + Build a chfn(1)-style GECOS field from its components. + + See: chfn(1) + + Parameters: + fullname - GECOS full name + roomnumber - GECOS room number + workphone - GECOS work phone + homephone - GECOS home phone + other - GECOS other + + Returns: string appropriate for a GECOS field value + """ + + # check first four params for illegal chars + args = (fullname, roomnumber, workphone, homephone) + names = ('fullname', 'roomnumber', 'workphone', 'homephone') + for index in xrange(4): + for badchar in (',', ':', '='): + if args[index] and badchar in str(args[index]): + raise InvalidArgument(names[index], args[index], "invalid characters") + + # check other for illegal chars + if other and ':' in str(other): + raise InvalidArgument('other', other, "invalid characters") + + # append each field + if fullname is not None: + gecos_data = str(fullname) + for field in (roomnumber, workphone, homephone, other): + if field is not None: + gecos_data += ',' + str(field) + + return gecos_data + + +def check_id_nss(ugid): + """Helper to ensure there is no account or group with an ID.""" + + try: + pwd.getpwuid(ugid) + return False + except KeyError: + pass + + try: + grp.getgrgid(ugid) + return False + except KeyError: + pass + + return True + + +def first_available_id(minimum, maximum): + """ + Determines the first available id within a range. + + To be "available", there must be neither a user + with the id nor a group with the id. + + Parameters: + minimum - smallest id that may be returned + maximum - largest id that may be returned + + Returns: the id, or None if there are none available + + Example: first_available_id(20000, 40000) -> 20018 + """ + + # get lists of used uids and gids in LDAP + uids = ldap_connection.used_uids(minimum, maximum) + gids = ldap_connection.used_gids(minimum, maximum) + + # iterate through the lists and return the first available + for ugid in xrange(minimum, maximum+1): + if ugid not in uids and ugid not in gids and check_id_nss(ugid): + return ugid + + # no id found within the range + return None @@ -220,13 +954,211 @@ def delete_account(username): if __name__ == '__main__': - # A word of notice: this test creates a _working_ account (and then deletes it). - # If deletion fails it must be cleaned up manually. - - # a bit of salt so the test account is reasonably tough to crack import random - pw = str(random.randint(100000000000000000, 999999999999999999)) - - print "running create_account('testuser', ..., 'Test User', ...)", "->", errors[create_account('testuser', pw, 'Test User')] - print "running delete_account('testuser')", "->", errors[delete_account('testuser')] + from csc.common.test import * + def test_exists(name): + return ldap_connection.user_lookup(name) is not None, \ + ldap_connection.group_lookup(name) is not None, \ + krb_connection.get_principal(name) is not None + + # t=test u=user m=member a=adminv c=club + # g=group r=real e=expected n=new + tuname = 'testuser' + turname = 'Test User' + tunrname = 'User Test' + tudesc = 'May be deleted' + tuhome = '/home/testuser' + tunhome = '/users/testuser' + tushell = '/bin/false' + tunshell = '/bin/true' + tugecos = 'Test User,,,' + tungecos = 'User Test,,,' + tmname = 'testmember' + tmrname = 'Test Member' + tmmid = 31415 + tcname = 'testclub' + tcrname = 'Test Club' + tcmid = 98696 + taname = 'testadm' + tarname = 'Test Adm' + tgname = 'testgroup' + tgdesc = 'Test Group' + minid = 99999000 + maxid = 100000000 + tpw = str(random.randint(10**30, 10**31-1)) + tgecos = 'a,b,c,d,e' + tgecos_args = 'a','b','c','d','e' + + test(connect) + connect() + success() + + try: + delete(tuname); delete(tmname) + delete(tcname); delete(taname) + delete_group(tgname) + except (NoSuchAccount, NoSuchGroup): + pass + + test(create) + create(tuname, turname, minid, maxid, tuhome, tpw, tudesc, tugecos, tushell) + exists = test_exists(tuname) + expected = (True, True, True) + assert_equal(expected, exists) + success() + + test(create_member) + create_member(tmname, tpw, tmrname, tmmid) + exists = test_exists(tmname) + expected = (True, False, True) + assert_equal(expected, exists) + success() + + test(create_club) + create_club(tcname, tmrname, tmmid) + exists = test_exists(tcname) + expected = (True, False, False) + assert_equal(expected, exists) + success() + + test(create_adm) + create_adm(taname, tarname) + exists = test_exists(taname) + expected = (True, False, False) + assert_equal(expected, exists) + success() + + test(create_group) + create_group(tgname, minid, maxid, tgdesc) + exists = test_exists(tgname) + expected = (False, True, False) + assert_equal(expected, exists) + success() + + test(status) + assert_equal((True, True), status(tmname)) + assert_equal((True, False), status(tcname)) + success() + + test(reset_password) + reset_password(tuname, str(int(tpw)/2)) + reset_password(tmname, str(int(tpw)/3)) + negative(reset_password, (tcname,str(int(tpw)/4)), NoSuchAccount, "club should not have password") + negative(reset_password, (taname,str(int(tpw)/5)), NoSuchAccount, "club should not have password") + success() + + test(get_uid) + tuuid = get_uid(tuname) + assert_equal(True, int(tuuid) >= 0) + success() + + test(get_gid) + tugid = get_gid(tuname) + assert_equal(True, int(tugid) >= 0) + success() + + test(get_gecos) + ugecos = get_gecos(tuname) + assert_equal(tugecos, ugecos) + success() + + test(update_gecos) + update_gecos(tuname, tungecos) + ugecos = get_gecos(tuname) + assert_equal(tungecos, ugecos) + success() + + test(get_shell) + ushell = get_shell(tuname) + assert_equal(tushell, ushell) + success() + + test(update_shell) + update_shell(tuname, tunshell, False) + ushell = get_shell(tuname) + assert_equal(ushell, tunshell) + success() + + test(get_name) + urname = get_name(tuname) + assert_equal(turname, urname) + success() + + test(update_name) + update_name(tuname, tunrname) + urname = get_name(tuname) + assert_equal(urname, tunrname) + success() + + test(get_home) + uhome = get_home(tuname) + assert_equal(tuhome, uhome) + success() + + test(update_home) + update_home(tuname, tunhome) + urhome = get_home(tuname) + assert_equal(urhome, tunhome) + success() + + test(get_members) + members = get_members(tgname) + expected = [] + assert_equal(expected, members) + success() + + test(check_membership) + member = check_membership(tuname, tgname) + assert_equal(False, member) + member = check_membership(tuname, tuname) + assert_equal(True, member) + success() + + test(add_member) + add_member(tuname, tgname) + assert_equal(True, check_membership(tuname, tgname)) + assert_equal([tuname], get_members(tgname)) + success() + + test(remove_member) + assert_equal(True, remove_member(tuname, tgname)) + assert_equal(False, check_membership(tuname, tgname)) + assert_equal(False, remove_member(tuname, tgname)) + success() + + test(build_gecos) + assert_equal(tgecos, build_gecos(*tgecos_args)) + success() + + test(parse_gecos) + gecos_dict = parse_gecos(tgecos) + assert_equal(tgecos, build_gecos(**gecos_dict)) + success() + + test(delete) + delete(tuname) + exists = test_exists(tuname) + expected = (False, False, False) + assert_equal(expected, exists) + delete(tmname) + exists = test_exists(tmname) + assert_equal(expected, exists) + delete(tcname) + exists = test_exists(tcname) + assert_equal(expected, exists) + delete(taname) + exists = test_exists(taname) + assert_equal(expected, exists) + success() + + test(delete_group) + delete_group(tgname) + exists = test_exists(tgname) + expected = (False, False, False) + assert_equal(expected, exists) + success() + + test(disconnect) + disconnect() + success() diff --git a/pylib/csc/adm/members.py b/pylib/csc/adm/members.py index 405ec29..d458b2e 100644 --- a/pylib/csc/adm/members.py +++ b/pylib/csc/adm/members.py @@ -1,4 +1,3 @@ -# $Id: members.py 44 2006-12-31 07:09:27Z mspang $ """ CSC Member Management @@ -10,44 +9,29 @@ Transactions are used in each method that modifies the database. Future changes to the members database that need to be atomic must also be moved into this module. """ - import re from csc.adm import terms from csc.backends import db -from csc.common.conf import read_config +from csc.common import conf - - -### Configuration +### Configuration ### CONFIG_FILE = '/etc/csc/members.cf' cfg = {} - def load_configuration(): """Load Members Configuration""" - # configuration already loaded? - if len(cfg) > 0: - return + string_fields = [ 'studentid_regex', 'realname_regex', 'server', + 'database', 'user', 'password' ] - # read in the file - cfg_tmp = read_config(CONFIG_FILE) + # read configuration file + cfg_tmp = conf.read(CONFIG_FILE) - if not cfg_tmp: - raise MemberException("unable to read configuration file: %s" - % CONFIG_FILE) - - # check that essential fields are completed - mandatory_fields = [ 'server', 'database', 'user', 'password' ] - - for field in mandatory_fields: - if not field in cfg_tmp: - raise MemberException("missing configuratino option: %s" % field) - if not cfg_tmp[field]: - raise MemberException("null configuration option: %s" %field) + # verify configuration + conf.check_string_fields(CONFIG_FILE, string_fields, cfg_tmp) # update the current configuration with the loaded values cfg.update(cfg_tmp) @@ -56,24 +40,46 @@ def load_configuration(): ### Exceptions ### +DBException = db.DBException +ConfigurationException = conf.ConfigurationException + class MemberException(Exception): - """Exception class for member-related errors.""" + """Base exception class for member-related errors.""" class DuplicateStudentID(MemberException): """Exception class for student ID conflicts.""" - pass + def __init__(self, studentid): + self.studentid = studentid + def __str__(self): + return "Student ID already exists in the database: %s" % self.studentid class InvalidStudentID(MemberException): """Exception class for malformed student IDs.""" - pass + def __init__(self, studentid): + self.studentid = studentid + def __str__(self): + return "Student ID is invalid: %s" % self.studentid class InvalidTerm(MemberException): """Exception class for malformed terms.""" - pass + def __init__(self, term): + self.term = term + def __str__(self): + return "Term is invalid: %s" % self.term + +class InvalidRealName(MemberException): + """Exception class for invalid real names.""" + def __init__(self, name): + self.name = name + def __str__(self): + return "Name is invalid: %s" % self.name class NoSuchMember(MemberException): """Exception class for nonexistent members.""" - pass + def __init__(self, memberid): + self.memberid = memberid + def __str__(self): + return "Member not found: %d" % self.memberid @@ -82,12 +88,10 @@ class NoSuchMember(MemberException): # global database connection connection = db.DBConnection() - def connect(): """Connect to PostgreSQL.""" load_configuration() - connection.connect(cfg['server'], cfg['database']) @@ -103,24 +107,27 @@ def connected(): return connection.connected() + ### Member Table ### -def new(realname, studentid=None, program=None): +def new(realname, studentid=None, program=None, mtype='user', userid=None): """ - Registers a new CSC member. The member is added - to the members table and registered for the current - term. + Registers a new CSC member. The member is added to the members table + and registered for the current term. Parameters: realname - the full real name of the member studentid - the student id number of the member program - the program of study of the member + mtype - a string describing the type of member ('user', 'club') + userid - the initial user id Returns: the memberid of the new member Exceptions: DuplicateStudentID - if the student id already exists in the database InvalidStudentID - if the student id is malformed + InvalidRealName - if the real name is malformed Example: new("Michael Spang", program="CS") -> 3349 """ @@ -128,16 +135,21 @@ def new(realname, studentid=None, program=None): # blank attributes should be NULL if studentid == '': studentid = None if program == '': program = None + if userid == '': userid = None + if mtype == '': mtype = None # check the student id format - regex = '^[0-9]{8}$' - if studentid != None and not re.match(regex, str(studentid)): - raise InvalidStudentID("student id is invalid: %s" % studentid) + if studentid is not None and not re.match(cfg['studentid_regex'], str(studentid)): + raise InvalidStudentID(studentid) + + # check real name format (UNIX account real names must not contain [,:=]) + if not re.match(cfg['realname_regex'], realname): + raise InvalidRealName(realname) # check for duplicate student id member = connection.select_member_by_studentid(studentid) if member: - raise DuplicateStudentID("student id exists in database: %s" % studentid) + raise DuplicateStudentID(studentid) # add the member memberid = connection.insert_member(realname, studentid, program) @@ -155,9 +167,6 @@ def get(memberid): """ Look up attributes of a member by memberid. - Parameters: - memberid - the member id number - Returns: a dictionary of attributes Example: get(3349) -> { @@ -188,7 +197,7 @@ def get_userid(userid): } """ - return connection.select_member_by_account(userid) + return connection.select_member_by_userid(userid) def get_studentid(studentid): @@ -265,20 +274,23 @@ def delete(memberid): """ Erase all records of a member. - Note: real members are never removed - from the database + Note: real members are never removed from the database - Parameters: - memberid - the member id number + Returns: attributes and terms of the member in a tuple - Returns: attributes and terms of the - member in a tuple + Exceptions: + NoSuchMember - if the member id does not exist Example: delete(0) -> ({ 'memberid': 0, name: 'Calum T. Dalek' ...}, ['s1993']) """ # save member data member = connection.select_member_by_id(memberid) + + # bail if not found + if not member: + raise NoSuchMember(memberid) + term_list = connection.select_terms(memberid) # remove data from the db @@ -291,13 +303,12 @@ def delete(memberid): def update(member): """ - Update CSC member attributes. None is NULL. + Update CSC member attributes. Parameters: - member - a dictionary with member attributes as - returned by get, possibly omitting some - attributes. member['memberid'] must exist - and be valid. + member - a dictionary with member attributes as returned by get, + possibly omitting some attributes. member['memberid'] + must exist and be valid. None is NULL. Exceptions: NoSuchMember - if the member id does not exist @@ -307,20 +318,18 @@ def update(member): Example: update( {'memberid': 3349, userid: 'mspang'} ) """ - if member.has_key('studentid') and member['studentid'] != None: + if member.has_key('studentid') and member['studentid'] is not None: studentid = member['studentid'] # check the student id format - regex = '^[0-9]{8}$' - if studentid != None and not re.match(regex, str(studentid)): - raise InvalidStudentID("student id is invalid: %s" % studentid) + if studentid is not None and not re.match(cfg['studentid_regex'], str(studentid)): + raise InvalidStudentID(studentid) # check for duplicate student id - member = connection.select_member_by_studentid(studentid) - if member: - raise DuplicateStudentID("student id exists in database: %s" % - studentid) + dupmember = connection.select_member_by_studentid(studentid) + if dupmember: + raise DuplicateStudentID(studentid) # not specifying memberid is a bug if not member.has_key('memberid'): @@ -328,10 +337,8 @@ def update(member): memberid = member['memberid'] # see if member exists - old_member = connection.select_member_by_id(memberid) - if not old_member: - raise NoSuchMember("memberid does not exist in database: %d" % - memberid) + if not get(memberid): + raise NoSuchMember(memberid) # do the update connection.update_member(member) @@ -359,14 +366,14 @@ def register(memberid, term_list): Example: register(3349, ["w2007", "s2007"]) """ - if not type(term_list) in (list, tuple): + if type(term_list) in (str, unicode): term_list = [ term_list ] for term in term_list: # check term syntax if not re.match('^[wsf][0-9]{4}$', term): - raise InvalidTerm("term is invalid: %s" % term) + raise InvalidTerm(term) # add term to database connection.insert_term(memberid, term) @@ -388,10 +395,10 @@ def registered(memberid, term): Example: registered(3349, "f2006") -> True """ - return connection.select_term(memberid, term) != None + return connection.select_term(memberid, term) is not None -def terms_list(memberid): +def member_terms(memberid): """ Retrieves a list of terms a member is registered for. @@ -404,7 +411,9 @@ def terms_list(memberid): Example: registered(0) -> 's1993' """ - return connection.select_terms(memberid) + terms_list = connection.select_terms(memberid) + terms_list.sort(terms.compare) + return terms_list @@ -412,15 +421,104 @@ def terms_list(memberid): if __name__ == '__main__': - connect() - - - sid = new("Test User", "99999999", "CS") + from csc.common.test import * - assert registered(id, terms.current()) - print get(sid) - register(sid, terms.next(terms.current())) - assert registered(sid, terms.next(terms.current())) - print terms_list(sid) - print get(sid) - print delete(sid) + # t=test m=member s=student u=updated + tmname = 'Test Member' + tmprogram = 'Metaphysics' + tmsid = '00000000' + tm2name = 'Test Member 2' + tm2sid = '00000001' + tm2uname = 'Test Member II' + tm2usid = '00000002' + tm2uprogram = 'Pseudoscience' + tm2uuserid = 'testmember' + + tmdict = {'name': tmname, 'userid': None, 'program': tmprogram, 'type': 'user', 'studentid': tmsid } + tm2dict = {'name': tm2name, 'userid': None, 'program': None, 'type': 'user', 'studentid': tm2sid } + tm2udict = {'name': tm2uname, 'userid': tm2uuserid, 'program': tm2uprogram, 'type': 'user', 'studentid': tm2usid } + + thisterm = terms.current() + nextterm = terms.next(thisterm) + + test(connect) + connect() + success() + + test(connected) + assert_equal(True, connected()) + success() + + dmid = get_studentid(tmsid) + if dmid: delete(dmid['memberid']) + dmid = get_studentid(tm2sid) + if dmid: delete(dmid['memberid']) + dmid = get_studentid(tm2usid) + if dmid: delete(dmid['memberid']) + + test(new) + tmid = new(tmname, tmsid, tmprogram) + tm2id = new(tm2name, tm2sid) + success() + + tmdict['memberid'] = tmid + tm2dict['memberid'] = tm2id + tm2udict['memberid'] = tm2id + + test(registered) + assert_equal(True, registered(tmid, thisterm)) + assert_equal(True, registered(tm2id, thisterm)) + assert_equal(False, registered(tmid, nextterm)) + success() + + test(get) + assert_equal(tmdict, get(tmid)) + assert_equal(tm2dict, get(tm2id)) + success() + + test(list_name) + assert_equal(True, tmid in [ x['memberid'] for x in list_name(tmname) ]) + assert_equal(True, tm2id in [ x['memberid'] for x in list_name(tm2name) ]) + success() + + test(register) + register(tmid, terms.next(terms.current())) + assert_equal(True, registered(tmid, nextterm)) + success() + + test(member_terms) + assert_equal([thisterm, nextterm], member_terms(tmid)) + assert_equal([thisterm], member_terms(tm2id)) + success() + + test(list_term) + assert_equal(True, tmid in [ x['memberid'] for x in list_term(thisterm) ]) + assert_equal(True, tmid in [ x['memberid'] for x in list_term(nextterm) ]) + assert_equal(True, tm2id in [ x['memberid'] for x in list_term(thisterm) ]) + assert_equal(False, tm2id in [ x['memberid'] for x in list_term(nextterm) ]) + success() + + test(update) + update(tm2udict) + assert_equal(tm2udict, get(tm2id)) + success() + + test(get_userid) + assert_equal(tm2udict, get_userid(tm2uuserid)) + success() + + test(get_studentid) + assert_equal(tm2udict, get_studentid(tm2usid)) + assert_equal(tmdict, get_studentid(tmsid)) + success() + + test(delete) + delete(tmid) + delete(tm2id) + success() + + test(disconnect) + disconnect() + assert_equal(False, connected()) + disconnect() + success() diff --git a/pylib/csc/adm/terms.py b/pylib/csc/adm/terms.py index 3f9cae2..e70d190 100644 --- a/pylib/csc/adm/terms.py +++ b/pylib/csc/adm/terms.py @@ -1,11 +1,9 @@ -# $Id: terms.py 44 2006-12-31 07:09:27Z mspang $ """ Terms Routines -This module contains functions for manipulating -terms, such as determining the current term, -finding the next or previous term, converting -dates to terms, and more. +This module contains functions for manipulating terms, such as determining +the current term, finding the next or previous term, converting dates to +terms, and more. """ import time, datetime, re @@ -16,27 +14,27 @@ EPOCH = 1970 SEASONS = [ 'w', 's', 'f' ] -def valid(term): +def validate(term): """ - Determines whether a term is well-formed: + Determines whether a term is well-formed. Parameters: term - the term string Returns: whether the term is valid (boolean) - Example: valid("f2006") -> True + Example: validate("f2006") -> True """ regex = '^[wsf][0-9]{4}$' - return re.match(regex, term) != None + return re.match(regex, term) is not None def parse(term): """Helper function to convert a term string to the number of terms since the epoch. Such numbers are intended for internal use only.""" - if not valid(term): + if not validate(term): raise Exception("malformed term: %s" % term) year = int( term[1:] ) @@ -176,8 +174,8 @@ def from_timestamp(timestamp): This function notes that: WINTER = JANUARY to APRIL - SPRING = MAY TO AUGUST - FALL = SEPTEMBER TO DECEMBER + SPRING = MAY to AUGUST + FALL = SEPTEMBER to DECEMBER Parameters: timestamp - number of seconds since the epoch @@ -235,18 +233,22 @@ def next_unregistered(registered): if __name__ == '__main__': - assert parse('f2006') == 110 - assert generate(110) == 'f2006' - assert next('f2006') == 'w2007' - assert previous('f2006') == 's2006' - assert delta('f2006', 'w2007') == 1 - assert add('f2006', delta('f2006', 'w2010')) == 'w2010' - assert interval('f2006', 3) == ['f2006', 'w2007', 's2007'] - assert from_timestamp(1166135779) == 'f2006' - assert parse( current() ) >= 110 - assert next_unregistered( [current()] ) == next( current() ) - assert next_unregistered( [] ) == current() - assert next_unregistered( [previous(current())] ) == current() - assert next_unregistered( [add(current(), -2)] ) == current() + from csc.common.test import * - print "All tests passed." "\n" + test(parse); assert_equal(110, parse('f2006')); success() + test(generate); assert_equal('f2006', generate(110)); success() + test(next); assert_equal('w2007', next('f2006')); success() + test(previous); assert_equal('s2006', previous('f2006')); success() + test(delta); assert_equal(1, delta('f2006', 'w2007')); success() + test(compare); assert_equal(-1, compare('f2006', 'w2007')); success() + test(add); assert_equal('w2010', add('f2006', delta('f2006', 'w2010'))); success() + test(interval); assert_equal(['f2006', 'w2007', 's2007'], interval('f2006', 3)); success() + test(from_timestamp); assert_equal('f2006', from_timestamp(1166135779)); success() + test(current); assert_equal(True, parse( current() ) >= 110 ); success() + + test(next_unregistered) + assert_equal( next(current()), next_unregistered([ current() ])) + assert_equal( current(), next_unregistered([])) + assert_equal( current(), next_unregistered([ previous(current()) ])) + assert_equal( current(), next_unregistered([ add(current(), -2) ])) + success() diff --git a/pylib/csc/apps/__init__.py b/pylib/csc/apps/__init__.py index 09ddf85..162075b 100644 --- a/pylib/csc/apps/__init__.py +++ b/pylib/csc/apps/__init__.py @@ -1,9 +1,8 @@ -# $Id: __init__.py 23 2006-12-18 20:14:51Z mspang $ """ -User Interfaces +Application-style User Interfaces -This module contains frontends and related modules. -CEO's primary frontends are: +This module contains large frontends with many functions +and fancy graphical user interfaces. legacy - aims to reproduce the curses UI of the previous CEO """ diff --git a/pylib/csc/apps/legacy/__init__.py b/pylib/csc/apps/legacy/__init__.py index 18352d2..0bc0b65 100644 --- a/pylib/csc/apps/legacy/__init__.py +++ b/pylib/csc/apps/legacy/__init__.py @@ -1,10 +1,8 @@ -# $Id: __init__.py 23 2006-12-18 20:14:51Z mspang $ """ Legacy User Interface This module contains the legacy CEO user interface and related modules. -Important modules are: - main.py - all of the main UI logic - helpers.py - user interface library routines + main - all of the main UI logic + helpers - user interface library routines """ diff --git a/pylib/csc/apps/legacy/helpers.py b/pylib/csc/apps/legacy/helpers.py index 9d7cbaa..74a6bb8 100644 --- a/pylib/csc/apps/legacy/helpers.py +++ b/pylib/csc/apps/legacy/helpers.py @@ -1,4 +1,3 @@ -# $Id: helpers.py 35 2006-12-28 05:14:05Z mspang $ """ Helpers for legacy User Interface @@ -7,7 +6,7 @@ the look and behavior of the previous CEO. Included is code for various curses-based UI widgets that were provided by Perl 5's Curses and Curses::Widgets libraries. -Though attempts have been made to keep the UI bug-compatible with +Though attempts have been made to keep the UI [bug-]compatible with the previous system, some compromises have been made. For example, the input and textboxes draw 'OK' and 'Cancel' buttons where the old CEO had them, but they are fake. That is, the buttons in the old @@ -52,14 +51,14 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True): # turn on cursor try: curses.curs_set(1) - except: + except curses.error: pass # set keypad mode to allow UP, DOWN, etc wnd.keypad(1) # the input string - input = "" + inputbuf = "" # offset of cursor in input # i.e. the next operation is applied at input[inputoff] @@ -78,7 +77,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True): if echo: # discard characters before displayoff, # as the window may be scrolled to the right - substring = input[displayoff:] + substring = inputbuf[displayoff:] # pad the string with zeroes to overwrite stale characters substring = substring + " " * (width - len(substring)) @@ -96,7 +95,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True): # enter returns input if key == KEY_RETURN: - return input + return inputbuf # escape aborts input elif key == KEY_ESCAPE: @@ -104,7 +103,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True): # EOT (C-d) aborts if there is no input elif key == KEY_EOT: - if len(input) == 0: + if len(inputbuf) == 0: return None # backspace removes the previous character @@ -112,7 +111,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True): if inputoff > 0: # remove the character immediately before the input offset - input = input[0:inputoff-1] + input[inputoff:] + inputbuf = inputbuf[0:inputoff-1] + inputbuf[inputoff:] inputoff -= 1 # move either the cursor or entire line of text left @@ -124,7 +123,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True): if inputoff < len(input): # remove the character at the input offset - input = input[0:inputoff] + input[inputoff+1:] + inputbuf = inputbuf[0:inputoff] + inputbuf[inputoff+1:] # left moves the cursor one character left elif key == curses.KEY_LEFT: @@ -139,7 +138,7 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True): # right moves the cursor one character right elif key == curses.KEY_RIGHT: - if inputoff < len(input): + if inputoff < len(inputbuf): # move the cursor to the right inputoff += 1 @@ -155,8 +154,8 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True): # end moves the cursor past the last character elif key == curses.KEY_END: - inputoff = len(input) - displayoff = len(input) - width + 1 + inputoff = len(inputbuf) + displayoff = len(inputbuf) - width + 1 # insert toggles insert/overwrite mode elif key == curses.KEY_IC: @@ -164,15 +163,15 @@ def read_input(wnd, offy, offx, width, maxlen, echo=True): # other (printable) characters are added to the input string elif curses.ascii.isprint(key): - if len(input) < maxlen or maxlen == 0: + if len(inputbuf) < maxlen or maxlen == 0: # insert mode: insert before current offset if insert: - input = input[0:inputoff] + chr(key) + input[inputoff:] + inputbuf = inputbuf[0:inputoff] + chr(key) + inputbuf[inputoff:] # overwrite mode: replace current offset else: - input = input[0:inputoff] + chr(key) + input[inputoff+1:] + inputbuf = inputbuf[0:inputoff] + chr(key) + inputbuf[inputoff+1:] # increment the input offset inputoff += 1 @@ -218,13 +217,13 @@ def inputbox(wnd, prompt, field_width, echo=True): # read an input string within the field region of text_wnd inputy, inputx, inputwidth = 1, 1, textwidth - 2 - input = read_input(text_wnd, inputy, inputx, inputwidth, 0, echo) + inputbuf = read_input(text_wnd, inputy, inputx, inputwidth, 0, echo) # erase the window child_wnd.erase() child_wnd.refresh() - return input + return inputbuf def line_wrap(line, width): @@ -323,7 +322,7 @@ def msgbox(wnd, msg, title="Message"): curses.curs_set(0) outer_wnd.keypad(1) while True: - key = outer_wnd.getch(0,0) + key = outer_wnd.getch(0, 0) if key == KEY_RETURN or key == KEY_ESCAPE: break @@ -379,18 +378,18 @@ def menu(wnd, offy, offx, width, options, _acquire_wnd=None): wnd.refresh() # read one keypress - input = wnd.getch() + keypress = wnd.getch() # UP moves to the previous option - if input == curses.KEY_UP and selected > 0: + if keypress == curses.KEY_UP and selected > 0: selected = (selected - 1) # DOWN moves to the next option - elif input == curses.KEY_DOWN and selected < len(options) - 1: + elif keypress == curses.KEY_DOWN and selected < len(options) - 1: selected = (selected + 1) # RETURN runs the callback for the selected option - elif input == KEY_RETURN: + elif keypress == KEY_RETURN: text, callback = options[selected] # highlight the selected option diff --git a/pylib/csc/apps/legacy/main.py b/pylib/csc/apps/legacy/main.py index 736d55b..f2c6387 100644 --- a/pylib/csc/apps/legacy/main.py +++ b/pylib/csc/apps/legacy/main.py @@ -1,4 +1,3 @@ -# $Id: main.py 44 2006-12-31 07:09:27Z mspang $ """ CEO-like Frontend @@ -21,7 +20,7 @@ BORDER_COLOR = curses.COLOR_RED def action_new_member(wnd): """Interactively add a new member.""" - username, studentid, program = '', None, '' + studentid, program = None, '' # read the name prompt = " Name: " @@ -33,7 +32,7 @@ def action_new_member(wnd): # read the student id prompt = "Student id:" - while studentid == None or (re.search("[^0-9]", studentid) and not studentid.lower() == 'exit'): + while studentid is None or (re.search("[^0-9]", studentid) and not studentid.lower() == 'exit'): studentid = inputbox(wnd, prompt, 18) # abort if exit is entered @@ -48,7 +47,7 @@ def action_new_member(wnd): program = inputbox(wnd, prompt, 18) # abort if exit is entered - if program == None or program.lower() == 'exit': + if program is None or program.lower() == 'exit': return False # connect the members module to its backend if necessary @@ -59,14 +58,17 @@ def action_new_member(wnd): memberid = members.new(realname, studentid, program) msgbox(wnd, "Success! Your memberid is %s. You are now registered\n" - % memberid + "for the " + terms.current() + " term."); + % memberid + "for the " + terms.current() + " term.") except members.InvalidStudentID: - msgbox(wnd, "Invalid student ID.") + msgbox(wnd, "Invalid student ID: %s" % studentid) return False except members.DuplicateStudentID: msgbox(wnd, "A member with this student ID exists.") return False + except members.InvalidRealName: + msgbox(wnd, 'Invalid real name: "%s"' % realname) + return False def action_term_register(wnd): @@ -85,7 +87,7 @@ def action_term_register(wnd): if not member: return False memberid = member['memberid'] - term_list = members.terms_list(memberid) + term_list = members.member_terms(memberid) # display user display_member_details(wnd, member, term_list) @@ -134,7 +136,7 @@ def action_term_register_multiple(wnd): if not member: return False memberid = member['memberid'] - term_list = members.terms_list(memberid) + term_list = members.member_terms(memberid) # display user display_member_details(wnd, member, term_list) @@ -177,11 +179,57 @@ def action_term_register_multiple(wnd): msgbox(wnd, "Your are now registered for terms: " + ", ".join(term_list)) except members.InvalidTerm: - msgbox(wnd, "Term is not valid: %s" % term) + msgbox(wnd, "Invalid term entered.") return False +def repair_account(wnd, memberid, userid): + """Attemps to repair an account.""" + + if not accounts.connected(): accounts.connect() + + member = members.get(memberid) + exists, haspw = accounts.status(userid) + + if not exists: + password = input_password(wnd) + accounts.create_member(userid, password, member['name'], memberid) + msgbox(wnd, "Account created (where the hell did it go, anyway?)\n" + "If you're homedir still exists, it will not be inaccessible to you,\n" + "please contact systems-committee@csclub.uwaterloo.ca to get this resolved.\n") + + elif not haspw: + password = input_password(wnd) + accounts.add_password(userid, password) + msgbox(wnd, "Password added to account.") + + else: + msgbox(wnd, "No problems to repair.") + + +def input_password(wnd): + + # password input loop + password = "password" + check = "check" + while password != check: + + # read password + prompt = "User password:" + password = None + while not password: + password = inputbox(wnd, prompt, 18, False) + + # read another password + prompt = "Enter the password again:" + check = None + while not check: + check = inputbox(wnd, prompt, 27, False) + + return password + + def action_create_account(wnd): """Interactively create an account for a member.""" @@ -198,7 +246,7 @@ def action_create_account(wnd): if not member: return False memberid = member['memberid'] - term_list = members.terms_list(memberid) + term_list = members.member_terms(memberid) # display the member display_member_details(wnd, member, term_list) @@ -218,67 +266,59 @@ def action_create_account(wnd): msgbox(wnd, "I suggest searching for the member by userid or name from the main menu.") return False + # member already has an account? + if member['userid']: + + userid = member['userid'] + msgbox(wnd, "Member " + str(memberid) + " already has an account: " + member['userid'] + "\n" + "Attempting to repair it. Contact the sysadmin if there are still problems." ) + + repair_account(wnd, memberid, userid) + + return False + + # read user id prompt = "Userid:" while userid == '': userid = inputbox(wnd, prompt, 18) # user abort - if userid == None or userid.lower() == 'exit': + if userid is None or userid.lower() == 'exit': return False - # member already has an account? - #if member['userid'] != None: - # msgbox(wnd, "Member " + str(memberid) + " already has an account: " + member['userid'] + "\n" - # "Contact the sysadmin if there are still problems." ) - # return False - - # password input loop - password = "password" - check = "check" - while password != check: - - # read password - prompt = "User password:" - password = None - while not password: - password = inputbox(wnd, prompt, 18, False) - - # read another password - prompt = "Enter the password again:" - check = None - while not check: - check = inputbox(wnd, prompt, 27, False) - + # read password + password = input_password(wnd) # create the UNIX account - result = accounts.create_account(userid, password, member['name'], memberid) - - if result == accounts.LDAP_EXISTS: - msgbox(wnd, "Error: Could not do stuff , Already exists.") + try: + if not accounts.connected(): accounts.connect() + accounts.create_member(userid, password, member['name'], memberid) + except accounts.AccountExists, e: + msgbox(wnd, str(e)) return False - elif result == accounts.KRB_EXISTS: - msgbox(wnd, "This account already exists in Kerberos, but not in LDAP. Please contact the Systems Administrator.") + except accounts.NoAvailableIDs, e: + msgbox(wnd, str(e)) return False - elif result == accounts.LDAP_NO_IDS: - msgbox(wnd, "There are no available UNIX user ids. This is a fatal error. Contact the Systems Administrator.") + except accounts.InvalidArgument, e: + msgbox(wnd, str(e)) return False - elif result == accounts.BAD_REALNAME: - msgbox(wnd, "Invalid real name: %s. Contact the Systems Administrator." % member['name']) + except accounts.LDAPException, e: + msgbox(wnd, "Error creating LDAP entry - Contact the Systems Administrator: %s" % e) return False - elif result == accounts.BAD_USERNAME: - msgbox(wnd, "Invalid username: %s. Enter a valid username." % userid) + except accounts.KrbException, e: + msgbox(wnd, "Error creating Kerberos principal - Contact the Systems Administrator: %s" % e) return False - elif result != accounts.SUCCESS: - raise Exception("Unexpected return status of accounts.create_account(): %s" % result) # now update the CEO database with the username - members.update( {'memberid':memberid, 'userid': userid} ) + members.update( {'memberid': memberid, 'userid': userid} ) # success msgbox(wnd, "Please run 'addhomedir " + userid + "'.") msgbox(wnd, "Success! Your account has been added") + return False + def display_member_details(wnd, member, term_list): """Display member attributes in a message box.""" @@ -343,17 +383,21 @@ def action_display_member(wnd): return False member = get_member_memberid_userid(wnd, memberid) - if not member: return - term_list = members.terms_list( member['memberid'] ) + if not member: return False + term_list = members.member_terms( member['memberid'] ) # display the details in a window display_member_details(wnd, member, term_list) + return False + def page(text): + """Send a text buffer to an external pager for display.""" try: - pipe = os.popen('/usr/bin/less', 'w') + pager = '/usr/bin/less' + pipe = os.popen(pager, 'w') pipe.write(text) pipe.close() except IOError: @@ -385,7 +429,7 @@ def action_list_term(wnd): # read the term prompt = "Which term to list members for ([fws]20nn): " - while term == None or (not term == '' and not re.match('^[wsf][0-9]{4}$', term) and not term == 'exit'): + while term is None or (not term == '' and not re.match('^[wsf][0-9]{4}$', term) and not term == 'exit'): term = inputbox(wnd, prompt, 41) # abort when exit is entered @@ -404,8 +448,11 @@ def action_list_term(wnd): # display the mass of text with a pager page( buf ) + return False + def action_list_name(wnd): + """Interactively search for members by name.""" name = None @@ -420,7 +467,7 @@ def action_list_name(wnd): # connect the members module to its backends if necessary if not members.connected(): members.connect() - # retrieve a list of members for term + # retrieve a list of members with similar names member_list = members.list_name(name) # format the data into a mess of text @@ -429,8 +476,11 @@ def action_list_name(wnd): # display the mass of text with a pager page( buf ) + return False + def action_list_studentid(wnd): + """Interactively search for members by student id.""" studentid = None @@ -458,6 +508,8 @@ def action_list_studentid(wnd): # display the mass of text with a pager page( buf ) + return False + def null_callback(wnd): """Callback for unimplemented menu options.""" @@ -479,7 +531,7 @@ top_menu = [ ( "Search for a member by name", action_list_name ), ( "Search for a member by student id", action_list_studentid ), ( "Create an account", action_create_account ), - ( "Re Create an account", null_callback ), + ( "Re Create an account", action_create_account ), ( "Library functions", null_callback ), ( "Exit", exit_callback ), ] @@ -490,11 +542,10 @@ def acquire_ceo_wnd(screen=None): # hack to get a reference to the entire screen # even when the caller doesn't (shouldn't) have one - global _screen - if screen == None: - screen = _screen + if screen is None: + screen = globals()['screen'] else: - _screen = screen + globals()['screen'] = screen # if the screen changes size, a mess may be left screen.erase() @@ -526,13 +577,21 @@ def ceo_main_curses(screen): # create ceo window ceo_wnd, menu_y, menu_x, menu_height, menu_width = acquire_ceo_wnd(screen) - # display the top level menu - menu(ceo_wnd, menu_y, menu_x, menu_width, top_menu, acquire_ceo_wnd) + try: + # display the top level menu + menu(ceo_wnd, menu_y, menu_x, menu_width, top_menu, acquire_ceo_wnd) + finally: + members.disconnect() + accounts.disconnect() def run(): """Main function for legacy UI.""" + # workaround for xterm-color (bad terminfo? - curs_set(0) fails) + if "TERM" in os.environ and os.environ['TERM'] == "xterm-color": + os.environ['TERM'] = "xterm" + # wrap the entire program using curses.wrapper # so that the terminal is restored to a sane state # when the program exits @@ -541,7 +600,7 @@ def run(): except KeyboardInterrupt: pass except curses.error: - print "Your screen is too small!" + print "Is your screen too small?" raise except: reset() diff --git a/pylib/csc/backends/__init__.py b/pylib/csc/backends/__init__.py index 531a029..72ab55b 100644 --- a/pylib/csc/backends/__init__.py +++ b/pylib/csc/backends/__init__.py @@ -1,12 +1,9 @@ -# $Id$ """ -Backends +Backend Modules This module contains backend interfaces and related modules. -CEO's primary backends are: - db.py - CEO's database for member and term registrations - ldapi.py - LDAP, for UNIX account metadata administration - krb.py - Kerberos, for UNIX account password administration - + db - CEO database interface for member registrations + ldapi - LDAP interface for UNIX account attribute administration + krb - Kerberos interface for UNIX account password management """ diff --git a/pylib/csc/backends/db.py b/pylib/csc/backends/db.py index d01e1a0..6d74a5e 100644 --- a/pylib/csc/backends/db.py +++ b/pylib/csc/backends/db.py @@ -1,4 +1,3 @@ -# $Id: db.py 37 2006-12-28 10:00:50Z mspang $ """ Database Backend Interface @@ -7,7 +6,7 @@ Methods on the connection class correspond in a straightforward way to SQL queries. These methods may restructure and clean up query output but may make no other assumptions about its content or purpose. -This module makes use of the PygreSQL Python bindings to libpq, +This module makes use of the PyGreSQL Python bindings to libpq, PostgreSQL's native C client library. """ import pgdb @@ -20,7 +19,7 @@ class DBException(Exception): class DBConnection(object): """ - Connection to CEO's backend database. All database queries + A connection to CEO's backend database. All database queries and updates are made via this class. Exceptions: (all methods) @@ -84,7 +83,7 @@ class DBConnection(object): def connected(self): """Determine whether the connection has been established.""" - return self.cnx != None + return self.cnx is not None def commit(self): @@ -130,8 +129,7 @@ class DBConnection(object): # build a dictionary of dictionaries from the result (a list of lists) members_dict = {} for member in members_list: - memberid, name, studentid, program, type, userid = member - members_dict[memberid] = { + members_dict[member[0]] = { 'memberid': member[0], 'name': member[1], 'studentid': member[2], @@ -236,13 +234,13 @@ class DBConnection(object): return self.select_single_member(sql, params) - def select_member_by_account(self, username): + def select_member_by_userid(self, username): """ Retrieves a single member by UNIX account username. See: self.select_single_member() - Example: connection.select_member_by_account('ctdalek') -> + Example: connection.select_member_by_userid('ctdalek') -> { 'memberid': 0, 'name': 'Calum T. Dalek' ...} """ sql = "SELECT memberid, name, studentid, program, type, userid FROM members WHERE userid=%s" @@ -266,7 +264,7 @@ class DBConnection(object): return self.select_single_member(sql, params) - def insert_member(self, name, studentid=None, program=None): + def insert_member(self, name, studentid=None, program=None, mtype='user', userid=None): """ Creates a member with the specified attributes. @@ -274,6 +272,8 @@ class DBConnection(object): name - full name of member studentid - student id number program - program of study + mtype - member type + userid - account id Example: connection.insert_member('Michael Spang', '99999999', 'Math/CS') -> 3349 @@ -287,8 +287,8 @@ class DBConnection(object): memberid = result[0] # insert the member - sql = "INSERT INTO members (memberid, name, studentid, program, type) VALUES (%d, %s, %s, %s, %s)" - params = [ memberid, name, studentid, program, 'user' ] + sql = "INSERT INTO members (memberid, name, studentid, program, type, userid) VALUES (%d, %s, %s, %s, %s, %s)" + params = [ memberid, name, studentid, program, mtype, userid ] self.cursor.execute(sql, params) return memberid @@ -497,8 +497,8 @@ class DBConnection(object): def trim_memberid_sequence(self): """ Sets the value of the member id sequence to the id of the newest - member. For use after extensive testing to prevent large - intervals of unused memberids. + member. For use after testing to prevent large intervals of unused + memberids from developing. Note: this does nothing unless the most recently added member(s) have been deleted """ @@ -509,40 +509,163 @@ class DBConnection(object): ### Tests ### if __name__ == '__main__': - HOST = "localhost" - DATABASE = "ceo" + from csc.common.test import * + + conffile = "/etc/csc/pgsql.cf" + + cfg = dict([map(str.strip, a.split("=", 1)) for a in map(str.strip, open(conffile).read().split("\n")) if "=" in a ]) + hostnm = cfg['server'][1:-1] + dbase = cfg['database'][1:-1] + + # t=test m=member s=student d=default e=expected u=updated + tmname = 'Test Member' + tmuname = 'Member Test' + tmsid = '00000004' + tmusid = '00000008' + tmprogram = 'Undecidable' + tmuprogram = 'Nondetermined' + tmtype = 'Untyped' + tmutype = 'Poly' + tmuserid = 'tmem' + tmuuserid = 'identifier' + tm2name = 'Test Member 2' + tm2sid = '00000005' + tm2program = 'Undeclared' + tm3name = 'T. M. 3' + dtype = 'user' + tmterm = 'w0000' + tm3term = 'f1112' + tm3term2 = 's1010' + + emdict = { 'name': tmname, 'program': tmprogram, 'studentid': tmsid, 'type': tmtype, 'userid': tmuserid } + emudict = { 'name': tmuname, 'program': tmuprogram, 'studentid': tmusid, 'type': tmutype, 'userid': tmuuserid } + em2dict = { 'name': tm2name, 'program': tm2program, 'studentid': tm2sid, 'type': dtype, 'userid': None } + em3dict = { 'name': tm3name, 'program': None, 'studentid': None, 'type': dtype, 'userid': None } + + test(DBConnection) connection = DBConnection() + success() - print "Running disconnect()" - connection.disconnect() + test(connection.connect) + connection.connect(hostnm, dbase) + success() - print "Running connect('%s', '%s')" % (HOST, DATABASE) - connection.connect(HOST, DATABASE) + test(connection.connected) + assert_equal(True, connection.connected()) + success() - print "Running select_all_members()", "->", len(connection.select_all_members()), "members" - print "Running select_member_by_id(0)", "->", connection.select_member_by_id(0)['userid'] - print "Running select_members_by_name('Spang')", "->", connection.select_members_by_name('Spang').keys() - print "Running select_members_by_term('f2006')", "->", "[" + ", ".join(map(str, connection.select_members_by_term('f2006').keys()[0:10])) + " ...]" - - print "Running insert_member('test_member', '99999999', 'program')", - memberid = connection.insert_member('test_member', '99999999', 'program') - print "->", memberid + test(connection.insert_member) + tmid = connection.insert_member(tmname, tmsid, tmprogram, tmtype, tmuserid) + tm2id = connection.insert_member(tm2name, tm2sid, tm2program) + tm3id = connection.insert_member(tm3name) + assert_equal(True, int(tmid) >= 0) + assert_equal(True, int(tmid) >= 0) + success() - print "Running select_member_by_id(%d)" % memberid, "->", connection.select_member_by_id(memberid) - print "Running insert_term(%d, 'f2006')" % memberid - connection.insert_term(memberid, 'f2006') + emdict['memberid'] = tmid + emudict['memberid'] = tmid + em2dict['memberid'] = tm2id + em3dict['memberid'] = tm3id - print "Running select_terms(%d)" % memberid, "->", connection.select_terms(memberid) - print "Running update_member({'memberid':%d,'name':'test_updated','studentid':-1})" % memberid - connection.update_member({'memberid':memberid,'name':'test_updated','studentid':99999999}) - print "Running select_member_by_id(%d)" % memberid, "->", connection.select_member_by_id(memberid) - - print "Running rollback()" + test(connection.select_member_by_id) + m1 = connection.select_member_by_id(tmid) + m2 = connection.select_member_by_id(tm2id) + m3 = connection.select_member_by_id(tm3id) + assert_equal(emdict, m1) + assert_equal(em2dict, m2) + assert_equal(em3dict, m3) + success() + + test(connection.select_all_members) + members = connection.select_all_members() + assert_equal(True, tmid in members) + assert_equal(True, tm2id in members) + assert_equal(True, tm3id in members) + assert_equal(emdict, members[tmid]) + success() + + test(connection.select_members_by_name) + members = connection.select_members_by_name(tmname) + assert_equal(True, tmid in members) + assert_equal(False, tm3id in members) + assert_equal(emdict, members[tmid]) + success() + + test(connection.select_member_by_userid) + assert_equal(emdict, connection.select_member_by_userid(tmuserid)) + success() + + test(connection.insert_term) + connection.insert_term(tmid, tmterm) + connection.insert_term(tm3id, tm3term) + connection.insert_term(tm3id, tm3term2) + success() + + test(connection.select_members_by_term) + members = connection.select_members_by_term(tmterm) + assert_equal(True, tmid in members) + assert_equal(False, tm2id in members) + assert_equal(False, tm3id in members) + success() + + test(connection.select_term) + assert_equal(tmterm, connection.select_term(tmid, tmterm)) + assert_equal(None, connection.select_term(tm2id, tmterm)) + assert_equal(tm3term, connection.select_term(tm3id, tm3term)) + assert_equal(tm3term2, connection.select_term(tm3id, tm3term2)) + success() + + test(connection.select_terms) + trms = connection.select_terms(tmid) + trms2 = connection.select_terms(tm2id) + assert_equal([tmterm], trms) + assert_equal([], trms2) + success() + + test(connection.delete_term) + assert_equal(tm3term, connection.select_term(tm3id, tm3term)) + connection.delete_term(tm3id, tm3term) + assert_equal(None, connection.select_term(tm3id, tm3term)) + success() + + test(connection.update_member) + connection.update_member({'memberid': tmid, 'name': tmuname}) + connection.update_member({'memberid': tmid, 'program': tmuprogram, 'studentid': tmusid }) + connection.update_member({'memberid': tmid, 'userid': tmuuserid, 'type': tmutype }) + assert_equal(emudict, connection.select_member_by_id(tmid)) + connection.update_member(emdict) + assert_equal(emdict, connection.select_member_by_id(tmid)) + success() + + test(connection.delete_term_all) + connection.delete_term_all(tm2id) + connection.delete_term_all(tm3id) + assert_equal([], connection.select_terms(tm2id)) + assert_equal([], connection.select_terms(tm3id)) + success() + + test(connection.delete_member) + connection.delete_member(tm3id) + assert_equal(None, connection.select_member_by_id(tm3id)) + negative(connection.delete_member, (tmid,), DBException, "delete of term-registered member") + success() + + test(connection.rollback) connection.rollback() + assert_equal(None, connection.select_member_by_id(tm2id)) + success() - print "Resetting memberid sequence" + test(connection.commit) + connection.commit() + success() + + test(connection.trim_memberid_sequence) connection.trim_memberid_sequence() - - print "Running disconnect()" - connection.disconnect() + success() + + test(connection.disconnect) + connection.disconnect() + assert_equal(False, connection.connected()) + connection.disconnect() + success() diff --git a/pylib/csc/backends/ipc.py b/pylib/csc/backends/ipc.py index 8348a57..9831921 100644 --- a/pylib/csc/backends/ipc.py +++ b/pylib/csc/backends/ipc.py @@ -1,4 +1,3 @@ -# $Id: ipc.py 26 2006-12-20 21:25:08Z mspang $ """ IPC Library Functions @@ -14,22 +13,21 @@ class _pty_file(object): """ A 'file'-like wrapper class for pseudoterminal file descriptors. - This wrapper is necessary because Python has a nasty - habit of throwing OSError at pty EOF. + This wrapper is necessary because Python has a nasty habit of throwing + OSError at pty EOF. - This class also implements timeouts for read operations - which are handy for avoiding deadlock when both - processes are blocked in a read(). + This class also implements timeouts for read operations which are handy + for avoiding deadlock when both processes are blocked in a read(). - See the Python documentation of the file class - for explanation of the methods. + See the Python documentation of the file class for explanation + of the methods. """ def __init__(self, fd): self.fd = fd self.buffer = '' self.closed = False def __repr__(self): - status='open' + status = 'open' if self.closed: status = 'closed' return "<" + status + " pty '" + os.ttyname(self.fd) + "'>" @@ -43,8 +41,8 @@ class _pty_file(object): while data != '': # wait timeout for the pty to become ready, otherwise stop reading - if not block and len(select.select([self.fd],[],[], timeout)[0]) == 0: - break + if not block and len(select.select([self.fd], [], [], timeout)[0]) == 0: + break data = os.read(self.fd, 65536) self.buffer += data @@ -61,7 +59,7 @@ class _pty_file(object): try: # wait timeout for the pty to become ready, then read - if block or len(select.select([self.fd],[],[], timeout)[0]) != 0: + if block or len(select.select([self.fd], [], [], timeout)[0]) != 0: self.buffer += os.read(self.fd, size - len(self.buffer) ) except OSError: @@ -78,8 +76,8 @@ class _pty_file(object): while data != '' and self.buffer.find("\n") == -1 and (size < 0 or len(self.buffer) < size): # wait timeout for the pty to become ready, otherwise stop reading - if not block and len(select.select([self.fd],[],[], timeout)[0]) == 0: - break + if not block and len(select.select([self.fd], [], [], timeout)[0]) == 0: + break data = os.read(self.fd, 128) self.buffer += data @@ -94,7 +92,7 @@ class _pty_file(object): line = self.buffer[:split_index] self.buffer = self.buffer[split_index:] return line - def readlines(self, sizehint=None, block=True, timeout=0.1): + def readlines(self, sizehint=None, timeout=0.1): lines = [] line = None while True: @@ -138,7 +136,7 @@ def popeni(command, args, env=None): args - a list of arguments to pass to command env - optional environment for command - Returns: (pid, stdout, stdIn) + Returns: (pid, stdout, stdin) """ # use a pipe to send data to the child @@ -181,7 +179,7 @@ def popeni(command, args, env=None): # set the controlling terminal to the pty # by opening it (and closing it again since # it's already open as child_stdout) - fd = os.open(tty, os.O_RDWR); + fd = os.open(tty, os.O_RDWR) os.close(fd) # init stdin/out/err @@ -209,14 +207,21 @@ def popeni(command, args, env=None): return pid, _pty_file(parent_stdout), os.fdopen(parent_stdin, 'w') + ### Tests ### if __name__ == '__main__': - import sys - pid, recv, send = popeni('/usr/sbin/kadmin.local', ['kadmin']) + from csc.common.test import * - send.write("listprincs\n") + prog = '/bin/cat' + argv = [ prog ] + message = "test\n" + + test(popeni) + proc, recv, send = popeni(prog, argv) + send.write(message) send.flush() - - print recv.readlines() + line = recv.readline() + assert_equal(message.strip(), line.strip()) + success() diff --git a/pylib/csc/backends/krb.py b/pylib/csc/backends/krb.py index 23d021b..8a40777 100644 --- a/pylib/csc/backends/krb.py +++ b/pylib/csc/backends/krb.py @@ -1,4 +1,3 @@ -# $Id: krb.py 40 2006-12-29 00:40:31Z mspang $ """ Kerberos Backend Interface @@ -12,8 +11,8 @@ systems. Accounts that do not authenticate (e.g. club accounts) do not need a Kerberos principal. Unfortunately, there are no Python bindings to libkadm at this time. As a -temporary workaround, This module communicates with the kadmin CLI interface -via a pseudoterminal and pipe. +temporary workaround, this module communicates with the kadmin CLI interface +via a pseudo-terminal and a pipe. """ import os import ipc @@ -109,7 +108,7 @@ class KrbConnection(object): def connected(self): """Determine whether the connection has been established.""" - return self.pid != None + return self.pid is not None @@ -125,6 +124,7 @@ class KrbConnection(object): # list of lines output by kadmin result = [] + lines = [] # the kadmin prompt that signals the end output # note: KADMIN_ARGS[0] must be "kadmin" or the actual prompt will differ @@ -137,12 +137,12 @@ class KrbConnection(object): timeout_maximum = 1.00 # input loop: read from kadmin until the kadmin prompt - buffer = '' + buf = '' while True: # attempt to read any available data data = self.kadm_out.read(block=False, timeout=timeout) - buffer += data + buf += data # nothing was read if data == '': @@ -165,20 +165,20 @@ class KrbConnection(object): else: # kadmin died! - raise KrbException("kadmin died while reading response") + raise KrbException("kadmin died while reading response:\n%s\n%s" % ("\n".join(lines), buf)) # break into lines and save all but the final # line (which is incomplete) into result - lines = buffer.split("\n") - buffer = lines[-1] + lines = buf.split("\n") + buf = lines[-1] lines = lines[:-1] for line in lines: line = line.strip() result.append(line) - # if the incomplete lines in the buffer is the kadmin prompt, + # if the incomplete line in the buffer is the kadmin prompt, # then the result is complete and may be returned - if buffer.strip() == prompt: + if buf.strip() == prompt: break return result @@ -189,7 +189,7 @@ class KrbConnection(object): Helper function to execute a kadmin command. Parameters: - command - the command to execute + command - command string to pass on to kadmin Returns: a list of lines output by the command """ @@ -222,8 +222,8 @@ class KrbConnection(object): "ceo/admin@CSCLUB.UWATERLOO.CA", "sysadmin/admin@CSCLUB.UWATERLOO.CA", "mspang@CSCLUB.UWATERLOO.CA", + ... ] - """ principals = self.execute("list_principals") @@ -374,16 +374,13 @@ class KrbConnection(object): # ensure success message was received if not created: - raise KrbException("did not receive principal created in response") + raise KrbException("kadmin did not acknowledge principal creation") def delete_principal(self, principal): """ Delete a principal. - Parameters: - principal - the principal name - Example: connection.delete_principal("mspang@CSCLUB.UWATERLOO.CA") """ @@ -424,25 +421,116 @@ class KrbConnection(object): raise KrbException("did not receive principal deleted") + def change_password(self, principal, password): + """ + Changes a principal's password. + + Example: connection.change_password("mspang@CSCLUB.UWATERLOO.CA", "opensesame") + """ + + # exec the add_principal command + if password.find('"') == -1: + self.kadm_in.write('change_password -pw "' + password + '" "' + principal + '"\n') + else: + self.kadm_in.write('change_password "' + principal + '"\n') + self.kadm_in.write(password + "\n" + password + "\n") + + # send request and read response + self.kadm_in.flush() + output = self.read_result() + + # verify output + changed = False + for line in output: + + # ignore NOTICE lines + if line.find("NOTICE:") == 0: + continue + + # ignore prompts + elif line.find("Enter password") == 0 or line.find("Re-enter password") == 0: + continue + + # record whether success message was encountered + elif line.find("Password") == 0 and line.find("changed.") != 0: + changed = True + + # error messages + elif line.find("change_password:") == 0 or line.find("kadmin:") == 0: + raise KrbException(line) + + # unknown output + else: + raise KrbException("unexpected change_password output: " + line) + + # ensure success message was received + if not changed: + raise KrbException("kadmin did not acknowledge password change") + + ### Tests ### if __name__ == '__main__': - PRINCIPAL = 'ceo/admin@CSCLUB.UWATERLOO.CA' - KEYTAB = 'ceo.keytab' - - connection = KrbConnection() - print "running disconnect()" - connection.disconnect() - print "running connect('%s', '%s')" % (PRINCIPAL, KEYTAB) - connection.connect(PRINCIPAL, KEYTAB) - print "running list_principals()", "->", "[" + ", ".join(map(repr,connection.list_principals()[0:3])) + " ...]" - print "running get_privs()", "->", str(connection.get_privs()) - print "running add_principal('testtest', 'BLAH')" - connection.add_principal("testtest", "FJDSLDLFKJSF") - print "running get_principal('testtest')", "->", '(' + connection.get_principal("testtest")['Principal'] + ')' - print "running delete_principal('testtest')" - connection.delete_principal("testtest") - print "running disconnect()" - connection.disconnect() + + from csc.common.test import * + import random + + conffile = '/etc/csc/kerberos.cf' + + cfg = dict([map(str.strip, a.split("=", 1)) for a in map(str.strip, open(conffile).read().split("\n")) if "=" in a ]) + principal = cfg['admin_principal'][1:-1] + keytab = cfg['admin_keytab'][1:-1] + realm = cfg['realm'][1:-1] + + # t=test p=principal e=expected + tpname = 'testpirate' + '@' + realm + tpw = str(random.randint(10**30, 10**31-1)) + 'YAR!' + eprivs = ['GET', 'ADD', 'MODIFY', 'DELETE'] + + test(KrbConnection) + connection = KrbConnection() + success() + + test(connection.connect) + connection.connect(principal, keytab) + success() + + try: + connection.delete_principal(tpname) + except KrbException: + pass + + test(connection.connected) + assert_equal(True, connection.connected()) + success() + + test(connection.add_principal) + connection.add_principal(tpname, tpw) + success() + + test(connection.list_principals) + pals = connection.list_principals() + assert_equal(True, tpname in pals) + success() + + test(connection.get_privs) + privs = connection.get_privs() + assert_equal(eprivs, privs) + success() + + test(connection.get_principal) + princ = connection.get_principal(tpname) + assert_equal(tpname, princ['Principal']) + success() + + test(connection.delete_principal) + connection.delete_principal(tpname) + assert_equal(None, connection.get_principal(tpname)) + success() + + test(connection.disconnect) + connection.disconnect() + assert_equal(False, connection.connected()) + success() diff --git a/pylib/csc/backends/ldapi.py b/pylib/csc/backends/ldapi.py index cf32e8f..7f31256 100644 --- a/pylib/csc/backends/ldapi.py +++ b/pylib/csc/backends/ldapi.py @@ -1,4 +1,3 @@ -# $Id: ldapi.py 41 2006-12-29 04:22:31Z mspang $ """ LDAP Backend Interface @@ -60,7 +59,7 @@ class LDAPConnection(object): """ - if bind_pw == None: bind_pw = '' + if bind_pw is None: bind_pw = '' try: @@ -93,7 +92,7 @@ class LDAPConnection(object): def connected(self): """Determine whether the connection has been established.""" - return self.ldap != None + return self.ldap is not None @@ -137,31 +136,33 @@ class LDAPConnection(object): Retrieve the attributes of a user. Parameters: - uid - the UNIX user accound name of the user + uid - the UNIX username to look up Returns: attributes of user with uid Example: connection.user_lookup('mspang') -> { 'uid': 'mspang', 'uidNumber': 21292 ...} """ + + if not self.connected(): raise LDAPException("Not connected!") dn = 'uid=' + uid + ',' + self.user_base return self.lookup(dn) - def user_search(self, filter): + def user_search(self, search_filter): """ Helper for user searches. Parameters: - filter - LDAP filter string to match users against + search_filter - LDAP filter string to match users against - Returns: the list of uids matched + Returns: the list of uids matched (usernames) """ # search for entries that match the filter try: - matches = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, filter) + matches = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, search_filter) except ldap.LDAPError, e: raise LDAPException("user search failed: %s" % e) @@ -196,47 +197,45 @@ class LDAPConnection(object): Parameters: uidNumber - the user id of the accounts desired - Returns: the list of uids matched + Returns: the list of uids matched (usernames) Example: connection.user_search_id(21292) -> ['mspang'] """ # search for posixAccount entries with the specified uidNumber - filter = '(&(objectClass=posixAccount)(uidNumber=%d))' % uidNumber - return self.user_search(filter) + search_filter = '(&(objectClass=posixAccount)(uidNumber=%d))' % uidNumber + return self.user_search(search_filter) def user_search_gid(self, gidNumber): """ - Retrieves a list of users with a certain UNIX gid number. + Retrieves a list of users with a certain UNIX gid + number (search by default group). - Parameters: - gidNumber - the group id of the accounts desired - - Returns: the list of uids matched + Returns: the list of uids matched (usernames) """ # search for posixAccount entries with the specified gidNumber - filter = '(&(objectClass=posixAccount)(gidNumber=%d))' % gidNumber - return self.user_search(filter) + search_filter = '(&(objectClass=posixAccount)(gidNumber=%d))' % gidNumber + return self.user_search(search_filter) - def user_add(self, uid, cn, loginShell, uidNumber, gidNumber, homeDirectory, gecos): + def user_add(self, uid, cn, uidNumber, gidNumber, homeDirectory, loginShell=None, gecos=None, description=None): """ Adds a user to the directory. Parameters: uid - the UNIX username for the account - cn - the full name of the member - userPassword - password of the account (our setup does not use this) - loginShell - login shell for the user + cn - the real name of the member uidNumber - the UNIX user id number - gidNumber - the UNIX group id number + gidNumber - the UNIX group id number (default group) homeDirectory - home directory for the user - gecos - comment field (usually stores miscellania) + loginShell - login shell for the user + gecos - comment field (usually stores name etc) + description - description field (optional and unimportant) Example: connection.user_add('mspang', 'Michael Spang', - '/bin/bash', 21292, 100, '/users/mspang', + 21292, 100, '/users/mspang', '/bin/bash', 'Michael Spang,,,') """ @@ -251,6 +250,11 @@ class LDAPConnection(object): 'homeDirectory': [ homeDirectory ], 'gecos': [ gecos ], } + + if loginShell: + attrs['loginShell'] = loginShell + if description: + attrs['description'] = [ description ] try: modlist = ldap.modlist.addModlist(attrs) @@ -265,7 +269,7 @@ class LDAPConnection(object): Parameters: uid - username of the user to modify - entry - dictionary as returned by user_lookup() with changes to make. + attrs - dictionary as returned by user_lookup() with changes to make. omitted attributes are DELETED. Example: user = user_lookup('mspang') @@ -295,9 +299,6 @@ class LDAPConnection(object): """ Removes a user from the directory. - Parameters: - uid - the UNIX username of the account - Example: connection.user_delete('mspang') """ @@ -318,7 +319,7 @@ class LDAPConnection(object): Parameters: cn - the UNIX group name to lookup - Returns: attributes of group with cn + Returns: attributes of the group's LDAP entry Example: connection.group_lookup('office') -> { 'cn': 'office', @@ -335,9 +336,6 @@ class LDAPConnection(object): """ Retrieves a list of groups with the specified UNIX group number. - Parameters: - gidNumber - the group id of the groups desired - Returns: a list of groups with gid gidNumber Example: connection.group_search_id(1001) -> ['office'] @@ -345,8 +343,8 @@ class LDAPConnection(object): # search for posixAccount entries with the specified uidNumber try: - filter = '(&(objectClass=posixGroup)(gidNumber=%d))' % gidNumber - matches = self.ldap.search_s(self.group_base, ldap.SCOPE_SUBTREE, filter) + search_filter = '(&(objectClass=posixGroup)(gidNumber=%d))' % gidNumber + matches = self.ldap.search_s(self.group_base, ldap.SCOPE_SUBTREE, search_filter) except ldap.LDAPError,e : raise LDAPException("group search failed: %s" % e) @@ -370,15 +368,11 @@ class LDAPConnection(object): return group_cns - def group_add(self, cn, gidNumber): + def group_add(self, cn, gidNumber, description=None): """ Adds a group to the directory. - Parameters: - cn - the name of the group - gidNumber - the number of the group - - Example: connection.group_add('office', 1001) + Example: connection.group_add('office', 1001, 'Office Staff') """ dn = 'cn=' + cn + ',' + self.group_base @@ -387,6 +381,8 @@ class LDAPConnection(object): 'cn': [ cn ], 'gidNumber': [ str(gidNumber) ], } + if description: + attrs['description'] = description try: modlist = ldap.modlist.addModlist(attrs) @@ -399,9 +395,8 @@ class LDAPConnection(object): """ Update group attributes in the directory. - The only available updates are fairly destructive - (rename or renumber) but this method is provided - for completeness. + The only available updates are fairly destructive (rename or renumber) + but this method is provided for completeness. Parameters: cn - name of the group to modify @@ -436,9 +431,6 @@ class LDAPConnection(object): """ Removes a group from the directory." - Parameters: - cn - the name of the group - Example: connection.group_delete('office') """ @@ -449,129 +441,203 @@ class LDAPConnection(object): raise LDAPException("unable to delete group: %s" % e) - def group_members(self, cn): - """ - Retrieves a group's members. - - Parameters: - cn - the name of the group - - Example: connection.group_members('office') -> - ['sfflaw', 'jeperry', 'cschopf' ...] - """ - - group = self.group_lookup(cn) - return group.get('memberUid', None) - - ### Miscellaneous Methods ### - - def first_id(self, minimum, maximum): - """ - Determines the first available id within a range. - To be "available", there must be neither a user - with the id nor a group with the id. + def used_uids(self, minimum=None, maximum=None): + """ + Compiles a list of used UIDs in a range. Parameters: - minimum - smallest uid that may be returned - maximum - largest uid that may be returned + minimum - smallest uid to return in the list + maximum - largest uid to return in the list - Returns: the id, or None if there are none available + Returns: list of integer uids - Example: connection.first_id(20000, 40000) -> 20018 + Example: connection.used_uids(20000, 40000) -> [20000, 20001, ...] """ - # compile a list of used uids try: users = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['uidNumber']) except ldap.LDAPError, e: raise LDAPException("search for uids failed: %s" % e) + uids = [] for user in users: dn, attrs = user uid = int(attrs['uidNumber'][0]) - if minimum <= uid <= maximum: + if (not minimum or uid >= minimum) and (not maximum or uid <= maximum): uids.append(uid) - # compile a list of used gids + return uids + + + def used_gids(self, minimum=None, maximum=None): + """ + Compiles a list of used GIDs in a range. + + Parameters: + minimum - smallest gid to return in the list + maximum - largest gid to return in the list + + Returns: list of integer gids + + Example: connection.used_gids(20000, 40000) -> [20000, 20001, ...] + """ + try: - groups = self.ldap.search_s(self.group_base, ldap.SCOPE_SUBTREE, '(objectClass=posixGroup)', ['gidNumber']) + users = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['gidNumber']) except ldap.LDAPError, e: raise LDAPException("search for gids failed: %s" % e) + gids = [] - for group in groups: - dn, attrs = group + for user in users: + dn, attrs = user gid = int(attrs['gidNumber'][0]) - if minimum <= gid <= maximum: + if (not minimum or gid >= minimum) and (not maximum or gid <= maximum): gids.append(gid) - # iterate through ids and return the first available - for id in xrange(minimum, maximum+1): - if not id in uids and not id in gids: - return id + return gids - # no suitable id was found - return None ### Tests ### if __name__ == '__main__': - password_file = 'ldap.ceo' - server = 'ldaps:///' - base_dn = 'dc=csclub,dc=uwaterloo,dc=ca' - bind_dn = 'cn=ceo,' + base_dn - user_dn = 'ou=People,' + base_dn - group_dn = 'ou=Group,' + base_dn - bind_pw = open(password_file).readline().strip() + from csc.common.test import * + conffile = '/etc/csc/ldap.cf' + cfg = dict([map(str.strip, a.split("=", 1)) for a in map(str.strip, open(conffile).read().split("\n")) if "=" in a ]) + srvurl = cfg['server_url'][1:-1] + binddn = cfg['admin_bind_dn'][1:-1] + bindpw = cfg['admin_bind_pw'][1:-1] + ubase = cfg['users_base'][1:-1] + gbase = cfg['groups_base'][1:-1] + minid = 99999000 + maxid = 100000000 + + # t=test u=user g=group c=changed r=real e=expected + tuname = 'testuser' + turname = 'Test User' + tuhome = '/home/testuser' + tushell = '/bin/false' + tugecos = 'Test User,,,' + tgname = 'testgroup' + cushell = '/bin/true' + cuhome = '/home/changed' + curname = 'Test Modified User' + + test("LDAPConnection()") connection = LDAPConnection() - print "running disconnect()" + success() + + test("disconnect()") connection.disconnect() - print "running connect('%s', '%s', '%s', '%s', '%s')" % (server, bind_dn, '***', user_dn, group_dn) - connection.connect(server, bind_dn, bind_pw, user_dn, group_dn) - print "running user_lookup('mspang')", "->", "(%s)" % connection.user_lookup('mspang')['uidNumber'][0] - print "running user_search_id(21292)", "->", connection.user_search_id(21292) - print "running first_id(20000, 40000)", "->", - first_id = connection.first_id(20000, 40000) - print first_id - print "running group_add('testgroup', %d)" % first_id + success() + + test("connect()") + connection.connect(srvurl, binddn, bindpw, ubase, gbase) + if not connection.connected(): + fail("not connected") + success() + try: - connection.group_add('testgroup', first_id) - except Exception, e: - print "FAILED: %s (continuing)" % e - print "running user_add('testuser', 'Test User', '/bin/false', %d, %d, '/home/null', 'Test User,,,')" % (first_id, first_id) - try: - connection.user_add('testuser', 'Test User', '/bin/false', first_id, first_id, '/home/null', 'Test User,,,') - except Exception, e: - print "FAILED: %s (continuing)" % e - print "running user_lookup('testuser')", "->", - user = connection.user_lookup('testuser') - print repr(connection.user_lookup('testuser')['cn'][0]) - user['homeDirectory'] = ['/home/changed'] - user['loginShell'] = ['/bin/true'] - print "running user_modify(...)" - connection.user_modify('testuser', user) - print "running user_lookup('testuser')", "->", - user = connection.user_lookup('testuser') - print '(%s, %s)' % (user['homeDirectory'], user['loginShell']) - print "running group_lookup('testgroup')", "->", - group = connection.group_lookup('testgroup') - print group - print "running group_modify(...)" - group['gidNumber'] = [str(connection.first_id(20000, 40000))] - group['memberUid'] = [ str(first_id) ] - connection.group_modify('testgroup', group) - print "running group_lookup('testgroup')", "->", - group = connection.group_lookup('testgroup') - print group - print "running user_delete('testuser')" - connection.user_delete('testuser') - print "running group_delete('testgroup')" - connection.group_delete('testgroup') - print "running user_search_gid(100)", "->", "[" + ", ".join(map(repr,connection.user_search_gid(100)[:10])) + " ...]" - print "running group_members('office')", "->", "[" + ", ".join(map(repr,connection.group_members('office')[:10])) + " ...]" - print "running disconnect()" + connection.user_delete(tuname) + connection.group_delete(tgname) + except LDAPException: + pass + + test("used_uids()") + uids = connection.used_uids(minid, maxid) + if type(uids) is not list: + fail("list not returned") + success() + + test("used_gids()") + gids = connection.used_gids(minid, maxid) + if type(gids) is not list: + fail("list not returned") + success() + + unusedids = [] + for idnum in xrange(minid, maxid): + if not idnum in uids and not idnum in gids: + unusedids.append(idnum) + + tuuid = unusedids.pop() + tugid = unusedids.pop() + eudata = { + 'uid': [ tuname ], + 'loginShell': [ tushell ], + 'uidNumber': [ str(tuuid) ], + 'gidNumber': [ str(tugid) ], + 'gecos': [ tugecos ], + 'homeDirectory': [ tuhome ], + 'cn': [ turname ] + } + + test("user_add()") + connection.user_add(tuname, turname, tuuid, tugid, tuhome, tushell, tugecos) + success() + + tggid = unusedids.pop() + egdata = { + 'cn': [ tgname ], + 'gidNumber': [ str(tggid) ] + } + + test("group_add()") + connection.group_add(tgname, tggid) + success() + + test("user_lookup()") + udata = connection.user_lookup(tuname) + del udata['objectClass'] + assert_equal(eudata, udata) + success() + + test("group_lookup()") + gdata = connection.group_lookup(tgname) + del gdata['objectClass'] + assert_equal(egdata, gdata) + success() + + test("user_search_id()") + eulist = [ tuname ] + ulist = connection.user_search_id(tuuid) + assert_equal(eulist, ulist) + success() + + test("user_search_gid()") + ulist = connection.user_search_gid(tugid) + if tuname not in ulist: + fail("(%s) not in (%s)" % (tuname, ulist)) + success() + + ecudata = connection.user_lookup(tuname) + ecudata['loginShell'] = [ cushell ] + ecudata['homeDirectory'] = [ cuhome ] + ecudata['cn'] = [ curname ] + + test("user_modify") + connection.user_modify(tuname, ecudata) + cudata = connection.user_lookup(tuname) + assert_equal(ecudata, cudata) + success() + + ecgdata = connection.group_lookup(tgname) + ecgdata['memberUid'] = [ tuname ] + + test("group_modify()") + connection.group_modify(tgname, ecgdata) + cgdata = connection.group_lookup(tgname) + assert_equal(ecgdata, cgdata) + success() + + test("user_delete()") + connection.group_delete(tgname) + success() + + test("disconnect()") connection.disconnect() + success() diff --git a/pylib/csc/common/__init__.py b/pylib/csc/common/__init__.py index 7369902..7876ba0 100644 --- a/pylib/csc/common/__init__.py +++ b/pylib/csc/common/__init__.py @@ -1,3 +1,7 @@ """ Generally Useful Common Modules + + conf - simple configuration file reader + excep - generally useful exceptions + test - test suite utility routines """ diff --git a/pylib/csc/common/conf.py b/pylib/csc/common/conf.py index 2902f1b..439e412 100644 --- a/pylib/csc/common/conf.py +++ b/pylib/csc/common/conf.py @@ -1,11 +1,75 @@ -"""Library Routines""" +""" +Configuration Utility Module -def read_config(config_file): +This module contains functions to load and verify very simple configuration +files. Python supports ".ini" files, which suck, so this module is used +instead. + +Example Configuration File: + + include /path/to/other.cf + + # these values are the same: + name_protected = "Michael Spang" + name_unprotected = Michael Spang + + # these values are not the same: + yes_no = " yes" + no_yes = yes + + # this value is an integer + arbitrary_number=2 + + # this value is not an integer + arbitrary_string="2" + + # this is a key with no value + csclub + + # this key contains whitespace + white space = sure, why not + + # these two lines are treated as one + long line = first line \ + second line + +Resultant Dictionary: + + { + 'name_protected': 'Michael Spang', + 'name_unprotected:' 'Michael Spang', + 'yes_no': ' yes', + 'no_yes': 'yes', + 'arbirary_number': 2, + 'arbitrary_string': '2', + 'csclub': None, + 'white space': 'sure, why not' + 'long line': 'first line \n second line' + + ... (data from other.cf) ... + } + +""" +from curses.ascii import isspace + + +class ConfigurationException(Exception): + """Exception class for incomplete and incorrect configurations.""" + + +def read(filename, included=None): + """Function to read a configuration file into a dictionary.""" + + if not included: + included = [] + if filename in included: + return {} + included.append(filename) try: - conffile = open(config_file) + conffile = open(filename) except IOError: - return None + raise ConfigurationException('unable to read configuration file: "%s"' % filename) options = {} @@ -15,9 +79,11 @@ def read_config(config_file): if line == '': break + # remove comments if '#' in line: line = line[:line.find('#')] + # combine lines when the newline is escaped with \ while len(line) > 1 and line[-2] == '\\': line = line[:-2] + line[-1] next = conffile.readline() @@ -25,22 +91,64 @@ def read_config(config_file): if next == '': break + line = line.strip() + + # process include statements + if line.find("include") == 0 and isspace(line[7]): + + filename = line[8:].strip() + options.update(read(filename, included)) + continue + + # split 'key = value' into key and value and strip results pair = map(str.strip, line.split('=', 1)) + # found key and value if len(pair) == 2: key, val = pair + # found quoted string? if val[0] == val[-1] == '"': val = val[1:-1] + + # unquoted, found float? else: try: - val = int(val) - except: + if "." in val: + val = float(val) + else: + val = int(val) + except ValueError: pass + # save key and value options[key] = val + + # found only key, value = None elif len(pair[0]) > 1: - key, = pair + key = pair[0] options[key] = None return options + + +def check_string_fields(filename, field_list, cfg): + """Function to verify thatfields are strings.""" + + for field in field_list: + if field not in cfg or type(cfg[field]) is not str: + raise ConfigurationException('expected string value for option "%s" in "%s"' % (field, filename)) + +def check_integer_fields(filename, field_list, cfg): + """Function to verify that fields are integers.""" + + for field in field_list: + if field not in cfg or type(cfg[field]) not in (int, long): + raise ConfigurationException('expected numeric value for option "%s" in "%s"' % (field, filename)) + +def check_float_fields(filename, field_list, cfg): + """Function to verify that fields are integers or floats.""" + + for field in field_list: + if field not in cfg or type(cfg[field]) not in (float, long, int): + raise ConfigurationException('expected float value for option "%s" in "%s"' % (field, filename)) diff --git a/pylib/csc/common/excep.py b/pylib/csc/common/excep.py new file mode 100644 index 0000000..6359302 --- /dev/null +++ b/pylib/csc/common/excep.py @@ -0,0 +1,12 @@ +""" +Exceptions Module + +This module provides some simple but generally useful exception classes. +""" + +class InvalidArgument(Exception): + """Exception class for bad argument values.""" + def __init__(self, argname, argval, explanation): + self.argname, self.argval, self.explanation = argname, argval, explanation + def __str__(self): + return 'Bad argument value "%s" for %s: %s' % (self.argval, self.argname, self.explanation) diff --git a/pylib/csc/common/test.py b/pylib/csc/common/test.py new file mode 100644 index 0000000..a60b2ed --- /dev/null +++ b/pylib/csc/common/test.py @@ -0,0 +1,42 @@ +""" +Common Test Routines + +This module contains helpful functions called by each module's test suite. +""" +from types import FunctionType, MethodType, ClassType, TypeType + + +class TestException(Exception): + """Exception class for test failures.""" + + +def test(subject): + """Print a test message.""" + if type(subject) in (MethodType, FunctionType, ClassType, TypeType): + print "testing %s()..." % subject.__name__, + else: + print "testing %s..." % subject, + + +def success(): + """Print a success message.""" + print "pass." + + +def assert_equal(expected, actual): + if expected != actual: + message = "Expected (%s)\nWas (%s)" % (repr(expected), repr(actual)) + fail(message) + + +def fail(message): + print "failed!" + raise TestException("Test failed:\n%s" % message) + + +def negative(call, args, excep, message): + try: + call(*args) + fail(message) + except excep: + pass diff --git a/sql/initialize.sh b/sql/initialize.sh index fe285a6..477a214 100755 --- a/sql/initialize.sh +++ b/sql/initialize.sh @@ -1,5 +1,4 @@ #!/bin/sh -# $Id: initialize.sh 13 2006-12-15 03:57:00Z mspang $ # Initializes a database for CEO. # initialize the database diff --git a/sql/structure.sql b/sql/structure.sql index a37258f..14c6f76 100644 --- a/sql/structure.sql +++ b/sql/structure.sql @@ -1,4 +1,3 @@ --- $Id: structure.sql 36 2006-12-28 10:00:11Z mspang $ -- Table structure for CEO's SQL database. -- Usage: diff --git a/sql/verify_studentid.sql b/sql/verify_studentid.sql index 504a089..da2a849 100644 --- a/sql/verify_studentid.sql +++ b/sql/verify_studentid.sql @@ -1,4 +1,3 @@ --- $Id: verify_studentid.sql 7 2006-12-11 06:27:22Z mspang $ -- PL/Python trigger to verify student ids for validity -- Dedicated to office staff who can't type student ids. diff --git a/sql/verify_term.sql b/sql/verify_term.sql index 49f0351..be1f3e4 100644 --- a/sql/verify_term.sql +++ b/sql/verify_term.sql @@ -1,4 +1,3 @@ --- $Id$ -- PL/Python trigger to verify terms for validity -- To (re)install: