Initial import (version 0.1).

This commit is contained in:
Michael Spang 2007-01-27 18:41:51 -05:00
commit dfc747f9c6
29 changed files with 4244 additions and 0 deletions

24
bin/ceo Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/python2.4 --
import os, sys
safe_environment = ['LOGNAME', 'USERNAME', 'USER', 'HOME',
'TERM', 'LANG', 'LC_ALL', 'LC_COLLATE',
'LC_CTYPE', 'LC_MESSAGE', 'LC_MONETARY',
'LC_NUMERIC', 'LC_TIME', 'UID', 'GID',
'SSH_CONNECTION', 'SSH_AUTH_SOCK',
'SSH_CLIENT']
for key in os.environ.keys():
if not key in safe_environment:
del os.environ[key]
os.environ['PATH'] = '/bin:/usr/bin'
for dir in sys.path[:]:
if not dir.find('/usr') == 0 or dir.find('/usr/local') == 0:
while dir in sys.path:
sys.path.remove(dir)
import csc.apps.legacy.main
csc.apps.legacy.main.run()

6
debian/changelog vendored Normal file
View File

@ -0,0 +1,6 @@
csc (0.1) unstable; urgency=low
* Initial Release.
-- Michael Spang <mspang@uwaterloo.ca> Thu, 28 Dec 2006 04:07:03 -0500

1
debian/compat vendored Normal file
View File

@ -0,0 +1 @@
4

14
debian/control vendored Normal file
View File

@ -0,0 +1,14 @@
Source: csc
Section: admin
Priority: optional
Maintainer: Michael Spang <mspang@uwaterloo.ca>
Build-Depends: debhelper (>= 4.0.0)
Standards-Version: 3.6.1
Package: csc
Architecture: any
Depends: python, python2.4, python2.4-ldap, python2.4-pygresql, krb5-user, less
Description: Computer Science Club Administrative Utilities
This package contains the CSC Electronic Office
and other Computer Science Club administrative
programs.

31
debian/copyright vendored Normal file
View File

@ -0,0 +1,31 @@
This package was debianized by mspang <mspang@uwaterloo.ca> on
Thu, 28 Dec 2006 04:07:03 -0500.
Copyright (c) 2006, 2007 Michael Spang
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the University of Waterloo Computer Science Club
nor the names of its contributors may be used to endorse or promote
products derived from this software without specific prior written
permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

66
debian/rules vendored Executable file
View File

@ -0,0 +1,66 @@
#!/usr/bin/make -f
PYTHON := python2.4
configure:
build: build-stamp
build-stamp:
mkdir build
$(CC) -DFULL_PATH=\"/usr/lib/csc/ceo\" -o build/ceo misc/setuid-prog.c
touch build-stamp
clean:
dh_testdir
dh_testroot
dh_clean
rm -f build-stamp
rm -rf build/
find py/ -name '*.pyc' -print0 | xargs -0 rm -f
install: build
dh_testdir
dh_testroot
dh_clean -k
# configuration files will contain sensitive information
chmod 600 etc/csc/*
dh_installdirs etc usr/lib/$(PYTHON)/site-packages usr/share/csc \
usr/lib/csc usr/bin
dh_install -X.svn -X.pyc py/csc usr/lib/$(PYTHON)/site-packages/
dh_install -X.svn -X.pyc etc/csc etc/
dh_install -X.svn -X.pyc sql/* usr/share/csc/
dh_install -X.svn -X.pyc bin/ceo usr/lib/csc/
dh_install -X.svn -X.pyc build/ceo usr/bin/
binary-indep: build install
dh_testdir
dh_testroot
dh_installchangelogs
dh_installdocs docs/*
dh_installexamples
dh_install
# dh_installlogrotate
# dh_installcron
dh_installman
dh_link
dh_strip
dh_compress
dh_fixperms
# dh_perl
# dh_python
# dh_makeshlibs
dh_installdeb
dh_shlibdeps
dh_gencontrol
dh_md5sums
dh_builddeb
binary: binary-indep binary-arch
.PHONY: build clean binary-indep binary-arch binary install configure
binary-arch: build install

8
docs/BUGS Normal file
View File

@ -0,0 +1,8 @@
Bugs and Caveats
================
CEO:
- curses does not draw borders/lines correctly in a screen session
- windows don't always clear properly
- the menu is not redrawn between windows and therefore a gap may grow there

35
etc/csc/accounts.cf Normal file
View File

@ -0,0 +1,35 @@
# $Id: accounts.cf 45 2007-01-02 01:39:10Z mspang $
# CSC Accounts Configuration
### Account Options ###
minimum_id = 20000
maximum_id = 40000
shell = "/bin/bash"
home = "/users"
gid = 100
### LDAP Configuration ###
server_url = "ldap:///"
users_base = "ou=People,dc=csclub,dc=uwaterloo,dc=ca"
groups_base = "ou=Group,dc=csclub,dc=uwaterloo,dc=ca"
bind_dn = "cn=ceo,dc=csclub,dc=uwaterloo,dc=ca"
bind_password = "secret"
### Kerberos Configuration ###
realm = "CSCLUB.UWATERLOO.CA"
principal = "ceo/admin@CSCLUB.UWATERLOO.CA"
keytab = "/etc/csc/ceo.keytab"
### Validation Tuning ###
username_regex = "^[a-z][-a-z0-9]*$"
realname_regex = "^[^,:=]*$"

15
etc/csc/members.cf Normal file
View File

@ -0,0 +1,15 @@
# $Id: members.cf 45 2007-01-02 01:39:10Z mspang $
# CSC Members Configuration
### Database Configuration ###
server = "localhost"
database = "ceo"
user = "ceo"
password = "secret"
### Validation Tuning ###
studentid_regex = "^[0-9]{8}$"
realname_regex = "^[^,:=]*$"

176
misc/setuid-prog.c Normal file
View File

@ -0,0 +1,176 @@
/*
Template for a setuid program that calls a script.
The script should be in an unwritable directory and should itself
be unwritable. In fact all parent directories up to the root
should be unwritable. The script must not be setuid, that's what
this program is for.
This is a template program. You need to fill in the name of the
script that must be executed. This is done by changing the
definition of FULL_PATH below.
There are also some rules that should be adhered to when writing
the script itself.
The first and most important rule is to never, ever trust that the
user of the program will behave properly. Program defensively.
Check your arguments for reasonableness. If the user is allowed to
create files, check the names of the files. If the program depends
on argv[0] for the action it should perform, check it.
Assuming the script is a Bourne shell script, the first line of the
script should be
#!/bin/sh -
The - is important, don't omit it. If you're using esh, the first
line should be
#!/usr/local/bin/esh -f
and for ksh, the first line should be
#!/usr/local/bin/ksh -p
The script should then set the variable IFS to the string
consisting of <space>, <tab>, and <newline>. After this (*not*
before!), the PATH variable should be set to a reasonable value and
exported. Do not expect the PATH to have a reasonable value, so do
not trust the old value of PATH. You should then set the umask of
the program by calling
umask 077 # or 022 if you want the files to be readable
If you plan to change directories, you should either unset CDPATH
or set it to a good value. Setting CDPATH to just ``.'' (dot) is a
good idea.
If, for some reason, you want to use csh, the first line should be
#!/bin/csh -fb
You should then set the path variable to something reasonable,
without trusting the inherited path. Here too, you should set the
umask using the command
umask 077 # or 022 if you want the files to be readable
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
/* CONFIGURATION SECTION */
#ifndef FULL_PATH /* so that this can be specified from the Makefile */
/* Uncomment the following line:
#define FULL_PATH "/full/path/of/script"
* Then comment out the #error line. */
#error "You must define FULL_PATH somewhere"
#endif
#ifndef UMASK
#define UMASK 077
#endif
/* END OF CONFIGURATION SECTION */
#if defined(__STDC__) && defined(__sgi)
#define environ _environ
#endif
/* don't change def_IFS */
char def_IFS[] = "IFS= \t\n";
/* you may want to change def_PATH, but you should really change it in */
/* your script */
#ifdef __sgi
char def_PATH[] = "PATH=/usr/bsd:/usr/bin:/bin:/usr/local/bin:/usr/sbin";
#else
char def_PATH[] = "PATH=/usr/ucb:/usr/bin:/bin:/usr/local/bin";
#endif
/* don't change def_CDPATH */
char def_CDPATH[] = "CDPATH=.";
/* don't change def_ENV */
char def_ENV[] = "ENV=:";
/*
This function changes all environment variables that start with LD_
into variables that start with XD_. This is important since we
don't want the script that is executed to use any funny shared
libraries.
The other changes to the environment are, strictly speaking, not
needed here. They can safely be done in the script. They are done
here because we don't trust the script writer (just like the script
writer shouldn't trust the user of the script).
If IFS is set in the environment, set it to space,tab,newline.
If CDPATH is set in the environment, set it to ``.''.
Set PATH to a reasonable default.
*/
void
clean_environ(void)
{
char **p;
extern char **environ;
for (p = environ; *p; p++) {
if (strncmp(*p, "LD_", 3) == 0)
**p = 'X';
else if (strncmp(*p, "_RLD", 4) == 0)
**p = 'X';
else if (strncmp(*p, "PYTHON", 6) == 0)
**p = 'X';
else if (strncmp(*p, "IFS=", 4) == 0)
*p = def_IFS;
else if (strncmp(*p, "CDPATH=", 7) == 0)
*p = def_CDPATH;
else if (strncmp(*p, "ENV=", 4) == 0)
*p = def_ENV;
}
putenv(def_PATH);
}
int
main(int argc, char **argv)
{
struct stat statb;
gid_t egid = getegid();
uid_t euid = geteuid();
/*
Sanity check #1.
This check should be made compile-time, but that's not possible.
If you're sure that you specified a full path name for FULL_PATH,
you can omit this check.
*/
if (FULL_PATH[0] != '/') {
fprintf(stderr, "%s: %s is not a full path name\n", argv[0],
FULL_PATH);
fprintf(stderr, "You can only use this wrapper if you\n");
fprintf(stderr, "compile it with an absolute path.\n");
exit(1);
}
/*
Sanity check #2.
Check that the owner of the script is equal to either the
effective uid or the super user.
*/
if (stat(FULL_PATH, &statb) < 0) {
perror("stat");
exit(1);
}
if (statb.st_uid != 0 && statb.st_uid != euid) {
fprintf(stderr, "%s: %s has the wrong owner\n", argv[0],
FULL_PATH);
fprintf(stderr, "The script should be owned by root,\n");
fprintf(stderr, "and shouldn't be writeable by anyone.\n");
exit(1);
}
if (setregid(egid, egid) < 0)
perror("setregid");
if (setreuid(euid, euid) < 0)
perror("setreuid");
clean_environ();
umask(UMASK);
while (**argv == '-') /* don't let argv[0] start with '-' */
(*argv)++;
execv(FULL_PATH, argv);
fprintf(stderr, "%s: could not execute the script\n", argv[0]);
exit(1);
}

19
py/csc/__init__.py Normal file
View File

@ -0,0 +1,19 @@
# $Id: __init__.py 24 2006-12-18 20:23:12Z mspang $
"""
PyCSC - CSC Administrative Utilities
Member Management:
ceo - legacy ceo interface
Account Management:
ceo - legacy ceo interface
Modules:
admin - administrative code (member and account management)
backend - backend interface code
ui - user interface code
"""

0
py/csc/admin/__init__.py Normal file
View File

232
py/csc/admin/accounts.py Normal file
View File

@ -0,0 +1,232 @@
# $Id: accounts.py 44 2006-12-31 07:09:27Z mspang $
# UNIX Accounts Module
import re
from csc.backend import ldapi, krb
from csc.lib import read_config
CONFIG_FILE = '/etc/csc/accounts.cf'
cfg = {}
# error constants
SUCCESS = 0
LDAP_EXISTS = 1
LDAP_NO_IDS = 2
LDAP_NO_USER = 3
KRB_EXISTS = 5
KRB_NO_USER = 6
BAD_USERNAME = 8
BAD_REALNAME = 9
# error messages
errors = [ "Success", "LDAP: entry exists",
"LDAP: no user ids available", "LDAP: no such entry",
"KRB: principal exists", "KRB: no such principal",
"Invalid username", "Invalid real name"]
class AccountException(Exception):
"""Exception class for account-related errors."""
def load_configuration():
"""Load Accounts Configuration."""
# configuration already loaded?
if len(cfg) > 0:
return
# read in the file
cfg_tmp = read_config(CONFIG_FILE)
if not cfg_tmp:
raise AccountException("unable to read configuration file: %s" % CONFIG_FILE)
# check that essential fields are completed
mandatory_fields = [ 'minimum_id', 'maximum_id', 'shell', 'home',
'gid', 'server_url', 'users_base', 'groups_base', 'bind_dn',
'bind_password', 'realm', 'principal', 'keytab', 'username_regex',
'realname_regex'
]
for field in mandatory_fields:
if not field in cfg_tmp:
raise AccountException("missing configuration option: %s" % field)
if not cfg_tmp[field]:
raise AccountException("null configuration option: %s" % field)
# check that numeric fields are ints
numeric_fields = [ 'minimum_id', 'maximum_id', 'gid' ]
for field in numeric_fields:
if not type(cfg_tmp[field]) in (int, long):
raise AccountException("non-numeric value for configuration option: %s" % field)
# update the current configuration with the loaded values
cfg.update(cfg_tmp)
def create_account(username, password, realname='', gecos_other=''):
"""
Creates a UNIX account for a member. This involves
first creating a directory entry, then creating
a Kerberos principal.
Parameters:
username - UNIX username for the member
realname - real name of the member
password - password for the account
Exceptions:
LDAPException - on LDAP failure
KrbException - on Kerberos failure
Returns:
SUCCESS - on success
BAD_REALNAME - on badly formed real name
BAD_USERNAME - on badly formed user name
LDAP_EXISTS - when the user exists in LDAP
LDAP_NO_IDS - when no user ids are free
KRB_EXISTS - when the user exists in Kerberos
"""
# Load Configuration
load_configuration()
### Connect to the Backends ###
ldap_connection = ldapi.LDAPConnection()
krb_connection = krb.KrbConnection()
try:
# connect to the LDAP server
ldap_connection.connect(cfg['server_url'], cfg['bind_dn'], cfg['bind_password'], cfg['users_base'], cfg['groups_base'])
# connect to the Kerberos master server
krb_connection.connect(cfg['principal'], cfg['keytab'])
### Sanity-checks ###
# check the username and realame for validity
if not re.match(cfg['username_regex'], username):
return BAD_USERNAME
if not re.match(cfg['realname_regex'], realname):
return BAD_REALNAME
# see if user exists in LDAP
if ldap_connection.user_lookup(username):
return LDAP_EXISTS
# determine the first available userid
userid = ldap_connection.first_id(cfg['minimum_id'], cfg['maximum_id'])
if not userid: return LDAP_NO_IDS
# build principal name from username
principal = username + '@' + cfg['realm']
# see if user exists in Kerberos
if krb_connection.get_principal(principal):
return KRB_EXISTS
### User creation ###
# process gecos_other (used to store memberid)
if gecos_other:
gecos_other = ',' + str(gecos_other)
# account information defaults
shell = cfg['shell']
home = cfg['home'] + '/' + username
gecos = realname + ',,,' + gecos_other
gid = cfg['gid']
# create the LDAP entry
ldap_connection.user_add(username, realname, shell, userid, gid, home, gecos)
# create the Kerberos principal
krb_connection.add_principal(principal, password)
finally:
ldap_connection.disconnect()
krb_connection.disconnect()
return SUCCESS
def delete_account(username):
"""
Deletes the UNIX account of a member.
Parameters:
username - UNIX username for the member
Exceptions:
LDAPException - on LDAP failure
KrbException - on Kerberos failure
Returns:
SUCCESS - on success
LDAP_NO_USER - when the user does not exist in LDAP
KRB_NO_USER - when the user does not exist in Kerberos
"""
# Load Configuration
load_configuration()
### Connect to the Backends ###
ldap_connection = ldapi.LDAPConnection()
krb_connection = krb.KrbConnection()
try:
# connect to the LDAP server
ldap_connection.connect(cfg['server_url'], cfg['bind_dn'], cfg['bind_password'], cfg['users_base'], cfg['groups_base'])
# connect to the Kerberos master server
krb_connection.connect(cfg['principal'], cfg['keytab'])
### Sanity-checks ###
# ensure user exists in LDAP
if not ldap_connection.user_lookup(username):
return LDAP_NO_USER
# build principal name from username
principal = username + '@' + cfg['realm']
# see if user exists in Kerberos
if not krb_connection.get_principal(principal):
return KRB_NO_USER
### User deletion ###
# delete the LDAP entry
ldap_connection.user_delete(username)
# delete the Kerberos principal
krb_connection.delete_principal(principal)
finally:
ldap_connection.disconnect()
krb_connection.disconnect()
return SUCCESS
### Tests ###
if __name__ == '__main__':
# A word of notice: this test creates a _working_ account (and then deletes it).
# If deletion fails it must be cleaned up manually.
# a bit of salt so the test account is reasonably tough to crack
import random
pw = str(random.randint(100000000000000000, 999999999999999999))
print "running create_account('testuser', ..., 'Test User', ...)", "->", errors[create_account('testuser', pw, 'Test User')]
print "running delete_account('testuser')", "->", errors[delete_account('testuser')]

426
py/csc/admin/members.py Normal file
View File

@ -0,0 +1,426 @@
# $Id: members.py 44 2006-12-31 07:09:27Z mspang $
"""
CSC Member Management
This module contains functions for registering new members, registering
members for terms, searching for members, and other member-related
functions.
Transactions are used in each method that modifies the database.
Future changes to the members database that need to be atomic
must also be moved into this module.
"""
import re
from csc.admin import terms
from csc.backend import db
from csc.lib import read_config
### Configuration
CONFIG_FILE = '/etc/csc/members.cf'
cfg = {}
def load_configuration():
"""Load Members Configuration"""
# configuration already loaded?
if len(cfg) > 0:
return
# read in the file
cfg_tmp = read_config(CONFIG_FILE)
if not cfg_tmp:
raise MemberException("unable to read configuration file: %s"
% CONFIG_FILE)
# check that essential fields are completed
mandatory_fields = [ 'server', 'database', 'user', 'password' ]
for field in mandatory_fields:
if not field in cfg_tmp:
raise MemberException("missing configuratino option: %s" % field)
if not cfg_tmp[field]:
raise MemberException("null configuration option: %s" %field)
# update the current configuration with the loaded values
cfg.update(cfg_tmp)
### Exceptions ###
class MemberException(Exception):
"""Exception class for member-related errors."""
class DuplicateStudentID(MemberException):
"""Exception class for student ID conflicts."""
pass
class InvalidStudentID(MemberException):
"""Exception class for malformed student IDs."""
pass
class InvalidTerm(MemberException):
"""Exception class for malformed terms."""
pass
class NoSuchMember(MemberException):
"""Exception class for nonexistent members."""
pass
### Connection Management ###
# global database connection
connection = db.DBConnection()
def connect():
"""Connect to PostgreSQL."""
load_configuration()
connection.connect(cfg['server'], cfg['database'])
def disconnect():
"""Disconnect from PostgreSQL."""
connection.disconnect()
def connected():
"""Determine whether the connection has been established."""
return connection.connected()
### Member Table ###
def new(realname, studentid=None, program=None):
"""
Registers a new CSC member. The member is added
to the members table and registered for the current
term.
Parameters:
realname - the full real name of the member
studentid - the student id number of the member
program - the program of study of the member
Returns: the memberid of the new member
Exceptions:
DuplicateStudentID - if the student id already exists in the database
InvalidStudentID - if the student id is malformed
Example: new("Michael Spang", program="CS") -> 3349
"""
# blank attributes should be NULL
if studentid == '': studentid = None
if program == '': program = None
# check the student id format
regex = '^[0-9]{8}$'
if studentid != None and not re.match(regex, str(studentid)):
raise InvalidStudentID("student id is invalid: %s" % studentid)
# check for duplicate student id
member = connection.select_member_by_studentid(studentid)
if member:
raise DuplicateStudentID("student id exists in database: %s" % studentid)
# add the member
memberid = connection.insert_member(realname, studentid, program)
# register them for this term
connection.insert_term(memberid, terms.current())
# commit the transaction
connection.commit()
return memberid
def get(memberid):
"""
Look up attributes of a member by memberid.
Parameters:
memberid - the member id number
Returns: a dictionary of attributes
Example: get(3349) -> {
'memberid': 3349,
'name': 'Michael Spang',
'program': 'Computer Science',
...
}
"""
return connection.select_member_by_id(memberid)
def get_userid(userid):
"""
Look up attributes of a member by userid.
Parameters:
userid - the UNIX user id
Returns: a dictionary of attributes
Example: get('mspang') -> {
'memberid': 3349,
'name': 'Michael Spang',
'program': 'Computer Science',
...
}
"""
return connection.select_member_by_account(userid)
def get_studentid(studentid):
"""
Look up attributes of a member by studnetid.
Parameters:
studentid - the student ID number
Returns: a dictionary of attributes
Example: get(...) -> {
'memberid': 3349,
'name': 'Michael Spang',
'program': 'Computer Science',
...
}
"""
return connection.select_member_by_studentid(studentid)
def list_term(term):
"""
Build a list of members in a term.
Parameters:
term - the term to match members against
Returns: a list of member dictionaries
Example: list_term('f2006'): -> [
{ 'memberid': 3349, ... },
{ 'memberid': ... }.
...
]
"""
# retrieve a list of memberids in term
memberlist = connection.select_members_by_term(term)
# convert the list of memberids to a list of dictionaries
memberlist = map(connection.select_member_by_id, memberlist)
return memberlist
def list_name(name):
"""
Build a list of members with matching names.
Parameters:
name - the name to match members against
Returns: a list of member dictionaries
Example: list_name('Spang'): -> [
{ 'memberid': 3349, ... },
{ 'memberid': ... },
...
]
"""
# retrieve a list of memberids matching name
memberlist = connection.select_members_by_name(name)
# convert the list of memberids to a list of dictionaries
memberlist = map(connection.select_member_by_id, memberlist)
return memberlist
def delete(memberid):
"""
Erase all records of a member.
Note: real members are never removed
from the database
Parameters:
memberid - the member id number
Returns: attributes and terms of the
member in a tuple
Example: delete(0) -> ({ 'memberid': 0, name: 'Calum T. Dalek' ...}, ['s1993'])
"""
# save member data
member = connection.select_member_by_id(memberid)
term_list = connection.select_terms(memberid)
# remove data from the db
connection.delete_term_all(memberid)
connection.delete_member(memberid)
connection.commit()
return (member, term_list)
def update(member):
"""
Update CSC member attributes. None is NULL.
Parameters:
member - a dictionary with member attributes as
returned by get, possibly omitting some
attributes. member['memberid'] must exist
and be valid.
Exceptions:
NoSuchMember - if the member id does not exist
InvalidStudentID - if the student id number is malformed
DuplicateStudentID - if the student id number exists
Example: update( {'memberid': 3349, userid: 'mspang'} )
"""
if member.has_key('studentid') and member['studentid'] != None:
studentid = member['studentid']
# check the student id format
regex = '^[0-9]{8}$'
if studentid != None and not re.match(regex, str(studentid)):
raise InvalidStudentID("student id is invalid: %s" % studentid)
# check for duplicate student id
member = connection.select_member_by_studentid(studentid)
if member:
raise DuplicateStudentID("student id exists in database: %s" %
studentid)
# not specifying memberid is a bug
if not member.has_key('memberid'):
raise Exception("no member specified in call to update")
memberid = member['memberid']
# see if member exists
old_member = connection.select_member_by_id(memberid)
if not old_member:
raise NoSuchMember("memberid does not exist in database: %d" %
memberid)
# do the update
connection.update_member(member)
# commit the transaction
connection.commit()
### Term Table ###
def register(memberid, term_list):
"""
Registers a member for one or more terms.
Parameters:
memberid - the member id number
term_list - the term to register for, or a list of terms
Exceptions:
InvalidTerm - if a term is malformed
Example: register(3349, "w2007")
Example: register(3349, ["w2007", "s2007"])
"""
if not type(term_list) in (list, tuple):
term_list = [ term_list ]
for term in term_list:
# check term syntax
if not re.match('^[wsf][0-9]{4}$', term):
raise InvalidTerm("term is invalid: %s" % term)
# add term to database
connection.insert_term(memberid, term)
connection.commit()
def registered(memberid, term):
"""
Determines whether a member is registered
for a term.
Parameters:
memberid - the member id number
term - the term to check
Returns: whether the member is registered
Example: registered(3349, "f2006") -> True
"""
return connection.select_term(memberid, term) != None
def terms_list(memberid):
"""
Retrieves a list of terms a member is
registered for.
Parameters:
memberid - the member id number
Returns: list of term strings
Example: registered(0) -> 's1993'
"""
return connection.select_terms(memberid)
### Tests ###
if __name__ == '__main__':
connect()
sid = new("Test User", "99999999", "CS")
assert registered(id, terms.current())
print get(sid)
register(sid, terms.next(terms.current()))
assert registered(sid, terms.next(terms.current()))
print terms_list(sid)
print get(sid)
print delete(sid)

252
py/csc/admin/terms.py Normal file
View File

@ -0,0 +1,252 @@
# $Id: terms.py 44 2006-12-31 07:09:27Z mspang $
"""
Terms Routines
This module contains functions for manipulating
terms, such as determining the current term,
finding the next or previous term, converting
dates to terms, and more.
"""
import time, datetime, re
# year to count terms from
EPOCH = 1970
# seasons list
SEASONS = [ 'w', 's', 'f' ]
def valid(term):
"""
Determines whether a term is well-formed:
Parameters:
term - the term string
Returns: whether the term is valid (boolean)
Example: valid("f2006") -> True
"""
regex = '^[wsf][0-9]{4}$'
return re.match(regex, term) != None
def parse(term):
"""Helper function to convert a term string to the number of terms
since the epoch. Such numbers are intended for internal use only."""
if not valid(term):
raise Exception("malformed term: %s" % term)
year = int( term[1:] )
season = SEASONS.index( term[0] )
return (year - EPOCH) * len(SEASONS) + season
def generate(term):
"""Helper function to convert a year and season to a term string."""
year = int(term / len(SEASONS)) + EPOCH
season = term % len(SEASONS)
return "%s%04d" % ( SEASONS[season], year )
def next(term):
"""
Returns the next term. (convenience function)
Parameters:
term - the term string
Retuns: the term string of the following term
Example: next("f2006") -> "w2007"
"""
return add(term, 1)
def previous(term):
"""
Returns the previous term. (convenience function)
Parameters:
term - the term string
Returns: the term string of the preceding term
Example: previous("f2006") -> "s2006"
"""
return add(term, -1)
def add(term, offset):
"""
Calculates a term relative to some base term.
Parameters:
term - the base term
offset - the number of terms since term (may be negative)
Returns: the term that comes offset terms after term
"""
return generate(parse(term) + offset)
def delta(initial, final):
"""
Calculates the distance between two terms.
It should be true that add(a, delta(a, b)) == b.
Parameters:
initial - the base term
final - the term at some offset from the base term
Returns: the offset of final relative to initial
"""
return parse(final) - parse(initial)
def compare(first, second):
"""
Compares two terms. This function is suitable
for use with list.sort().
Parameters:
first - base term for comparison
second - term to compare to
Returns: > 0 (if first > second)
= 0 (if first == second)
< 0 (if first < second)
"""
return delta(second, first)
def interval(base, count):
"""
Returns a list of adjacent terms.
Parameters:
base - the first term in the interval
count - the number of terms to include
Returns: a list of count terms starting with initial
Example: interval('f2006', 3) -> [ 'f2006', 'w2007', 's2007' ]
"""
terms = []
for num in xrange(count):
terms.append( add(base, num) )
return terms
def tstamp(timestamp):
"""Helper to convert seconds since the epoch
to terms since the epoch."""
# let python determine the month and year
date = datetime.date.fromtimestamp(timestamp)
# determine season
if date.month <= 4:
season = SEASONS.index('w')
elif date.month <= 8:
season = SEASONS.index('s')
else:
season = SEASONS.index('f')
return (date.year - EPOCH) * len(SEASONS) + season
def from_timestamp(timestamp):
"""
Converts a number of seconds since
the epoch to a number of terms since
the epoch.
This function notes that:
WINTER = JANUARY to APRIL
SPRING = MAY TO AUGUST
FALL = SEPTEMBER TO DECEMBER
Parameters:
timestamp - number of seconds since the epoch
Returns: the number of terms since the epoch
Example: from_timestamp(1166135779) -> 'f2006'
"""
return generate( tstamp(timestamp) )
def curr():
"""Helper to determine the current term."""
return tstamp( time.time() )
def current():
"""
Determines the current term.
Returns: current term
Example: current() -> 'f2006'
"""
return generate( curr() )
def next_unregistered(registered):
"""
Find the first future or current unregistered term.
Intended as the 'default' for registrations.
Parameters:
registered - a list of terms a member is registered for
Returns: the next unregistered term
"""
# get current term number
now = curr()
# never registered -> current term is next
if len( registered) < 1:
return generate( now )
# return the first unregistered, or the current term (whichever is greater)
return generate(max([max(map(parse, registered))+1, now]))
### Tests ###
if __name__ == '__main__':
assert parse('f2006') == 110
assert generate(110) == 'f2006'
assert next('f2006') == 'w2007'
assert previous('f2006') == 's2006'
assert delta('f2006', 'w2007') == 1
assert add('f2006', delta('f2006', 'w2010')) == 'w2010'
assert interval('f2006', 3) == ['f2006', 'w2007', 's2007']
assert from_timestamp(1166135779) == 'f2006'
assert parse( current() ) >= 110
assert next_unregistered( [current()] ) == next( current() )
assert next_unregistered( [] ) == current()
assert next_unregistered( [previous(current())] ) == current()
assert next_unregistered( [add(current(), -2)] ) == current()
print "All tests passed." "\n"

9
py/csc/apps/__init__.py Normal file
View File

@ -0,0 +1,9 @@
# $Id: __init__.py 23 2006-12-18 20:14:51Z mspang $
"""
User Interfaces
This module contains frontends and related modules.
CEO's primary frontends are:
legacy - aims to reproduce the curses UI of the previous CEO
"""

View File

@ -0,0 +1,10 @@
# $Id: __init__.py 23 2006-12-18 20:14:51Z mspang $
"""
Legacy User Interface
This module contains the legacy CEO user interface and related modules.
Important modules are:
main.py - all of the main UI logic
helpers.py - user interface library routines
"""

View File

@ -0,0 +1,412 @@
# $Id: helpers.py 35 2006-12-28 05:14:05Z mspang $
"""
Helpers for legacy User Interface
This module contains numerous functions that are designed to immitate
the look and behavior of the previous CEO. Included is code for various
curses-based UI widgets that were provided by Perl 5's Curses and
Curses::Widgets libraries.
Though attempts have been made to keep the UI bug-compatible with
the previous system, some compromises have been made. For example,
the input and textboxes draw 'OK' and 'Cancel' buttons where the old
CEO had them, but they are fake. That is, the buttons in the old
CEO were selectable but non-operational, but in the new CEO they are
not even selectable.
"""
import curses.ascii
# key constants not defined in CURSES
KEY_RETURN = ord('\n')
KEY_ESCAPE = 27
KEY_EOT = 4
def center(parent_dim, child_dim):
"""Helper for centering a length in a larget length."""
return (parent_dim-child_dim)/2
def read_input(wnd, offy, offx, width, maxlen, echo=True):
"""
Read user input within a confined region of a window.
Basic line-editing is supported:
LEFT, RIGHT, HOME, and END move around.
BACKSPACE and DEL remove characters.
INSERT switches between insert and overwrite mode.
ESC and C-d abort input.
RETURN completes input.
Parameters:
wnd - parent window for region
offy - the vertical offset for the beginning of the input region
offx - the horizontal offset for the beginning of the input region
width - the width of the region
maxlen - greatest number of characters to read (0 for no limit)
echo - boolean: whether to display typed characters
Returns: the string, or None when the user aborts.
"""
# turn on cursor
try:
curses.curs_set(1)
except:
pass
# set keypad mode to allow UP, DOWN, etc
wnd.keypad(1)
# the input string
input = ""
# offset of cursor in input
# i.e. the next operation is applied at input[inputoff]
inputoff = 0
# display offset (for scrolling)
# i.e. the first character in the region is input[displayoff]
displayoff = 0
# insert mode (True) or overwrite mode (False)
insert = True
while True:
# echo mode, display the string
if echo:
# discard characters before displayoff,
# as the window may be scrolled to the right
substring = input[displayoff:]
# pad the string with zeroes to overwrite stale characters
substring = substring + " " * (width - len(substring))
# display the substring
wnd.addnstr(offy, offx, substring, width)
# await input
key = wnd.getch(offy, offx + inputoff - displayoff)
# not echo mode, don't display the string
else:
# await input at arbitrary location
key = wnd.getch(offy, offx)
# enter returns input
if key == KEY_RETURN:
return input
# escape aborts input
elif key == KEY_ESCAPE:
return None
# EOT (C-d) aborts if there is no input
elif key == KEY_EOT:
if len(input) == 0:
return None
# backspace removes the previous character
elif key == curses.KEY_BACKSPACE:
if inputoff > 0:
# remove the character immediately before the input offset
input = input[0:inputoff-1] + input[inputoff:]
inputoff -= 1
# move either the cursor or entire line of text left
if displayoff > 0:
displayoff -= 1
# delete removes the current character
elif key == curses.KEY_DC:
if inputoff < len(input):
# remove the character at the input offset
input = input[0:inputoff] + input[inputoff+1:]
# left moves the cursor one character left
elif key == curses.KEY_LEFT:
if inputoff > 0:
# move the cursor to the left
inputoff -= 1
# scroll left if necessary
if inputoff < displayoff:
displayoff -= 1
# right moves the cursor one character right
elif key == curses.KEY_RIGHT:
if inputoff < len(input):
# move the cursor to the right
inputoff += 1
# scroll right if necessary
if displayoff - inputoff == width:
displayoff += 1
# home moves the cursor to the first character
elif key == curses.KEY_HOME:
inputoff = 0
displayoff = 0
# end moves the cursor past the last character
elif key == curses.KEY_END:
inputoff = len(input)
displayoff = len(input) - width + 1
# insert toggles insert/overwrite mode
elif key == curses.KEY_IC:
insert = not insert
# other (printable) characters are added to the input string
elif curses.ascii.isprint(key):
if len(input) < maxlen or maxlen == 0:
# insert mode: insert before current offset
if insert:
input = input[0:inputoff] + chr(key) + input[inputoff:]
# overwrite mode: replace current offset
else:
input = input[0:inputoff] + chr(key) + input[inputoff+1:]
# increment the input offset
inputoff += 1
# scroll right if necessary
if inputoff - displayoff == width:
displayoff += 1
def inputbox(wnd, prompt, field_width, echo=True):
"""Display a window for user input."""
wnd_height, wnd_width = wnd.getmaxyx()
height, width = 12, field_width + 7
# draw a window for the dialog
childy, childx = center(wnd_height-1, height)+1, center(wnd_width, width)
child_wnd = wnd.subwin(height, width, childy, childx)
child_wnd.clear()
child_wnd.border()
# draw another window for the text box
texty, textx = center(height-1, 3)+1, center(width-1, width-5)+1
textheight, textwidth = 3, width-5
text_wnd = child_wnd.derwin(textheight, textwidth, texty, textx)
text_wnd.clear()
text_wnd.border()
# draw the prompt
prompty, promptx = 2, 3
child_wnd.addnstr(prompty, promptx, prompt, width-2)
# draw the fake buttons
fakey, fakex = 9, width - 19
child_wnd.addstr(fakey, fakex, "< OK > < Cancel >")
child_wnd.addch(fakey, fakex+2, "O", curses.A_UNDERLINE)
child_wnd.addch(fakey, fakex+9, "C", curses.A_UNDERLINE)
# update the screen
child_wnd.noutrefresh()
text_wnd.noutrefresh()
curses.doupdate()
# read an input string within the field region of text_wnd
inputy, inputx, inputwidth = 1, 1, textwidth - 2
input = read_input(text_wnd, inputy, inputx, inputwidth, 0, echo)
# erase the window
child_wnd.erase()
child_wnd.refresh()
return input
def line_wrap(line, width):
"""Wrap a string to a certain width (returns a list of strings)."""
wrapped_lines = []
tokens = line.split(" ")
tokens.reverse()
tmp = tokens.pop()
if len(tmp) > width:
wrapped_lines.append(tmp[0:width])
tmp = tmp[width:]
while len(tokens) > 0:
token = tokens.pop()
if len(tmp) + len(token) + 1 <= width:
tmp += " " + token
elif len(token) > width:
tmp += " " + token[0:width-len(tmp)-1]
tokens.push(token[width-len(tmp)-1:])
else:
wrapped_lines.append(tmp)
tmp = token
wrapped_lines.append(tmp)
return wrapped_lines
def msgbox(wnd, msg, title="Message"):
"""Display a message in a window."""
# split message into a list of lines
lines = msg.split("\n")
# determine the dimensions of the method
message_height = len(lines)
message_width = 0
for line in lines:
if len(line) > message_width:
message_width = len(line)
# ensure the window fits the title
if len(title) > message_width:
message_width = len(title)
# maximum message width
parent_height, parent_width = wnd.getmaxyx()
max_message_width = parent_width - 8
# line-wrap if necessary
if message_width > max_message_width:
newlines = []
for line in lines:
for newline in line_wrap(line, max_message_width):
newlines.append(newline)
lines = newlines
message_width = max_message_width
message_height = len(lines)
# random padding that perl's curses adds
pad_width = 2
# create the outer window
outer_height, outer_width = message_height + 8, message_width + pad_width + 6
outer_y, outer_x = center(parent_height+1, outer_height)-1, center(parent_width, outer_width)
outer_wnd = wnd.derwin(outer_height, outer_width, outer_y, outer_x)
outer_wnd.erase()
outer_wnd.border()
# create the inner window
inner_height, inner_width = message_height + 2, message_width + pad_width + 2
inner_y, inner_x = 2, center(outer_width, inner_width)
inner_wnd = outer_wnd.derwin(inner_height, inner_width, inner_y, inner_x)
inner_wnd.border()
# display the title
outer_wnd.addstr(0, 1, " " + title + " ", curses.A_REVERSE | curses.A_BOLD)
# display the message
for i in xrange(len(lines)):
inner_wnd.addnstr(i+1, 1, lines[i], message_width)
# draw a solid block at the end of the first few lines
if i < len(lines) - 1:
inner_wnd.addch(i+1, inner_width-1, ' ', curses.A_REVERSE)
# display the fake OK button
fakey, fakex = outer_height - 3, outer_width - 8
outer_wnd.addstr(fakey, fakex, "< OK >", curses.A_REVERSE)
outer_wnd.addch(fakey, fakex+2, "O", curses.A_UNDERLINE | curses.A_REVERSE)
# update display
outer_wnd.noutrefresh()
inner_wnd.noutrefresh()
curses.doupdate()
# read a RETURN or ESC before returning
curses.curs_set(0)
outer_wnd.keypad(1)
while True:
key = outer_wnd.getch(0,0)
if key == KEY_RETURN or key == KEY_ESCAPE:
break
# clear the window
outer_wnd.erase()
outer_wnd.refresh()
def menu(wnd, offy, offx, width, options, _acquire_wnd=None):
"""
Draw a menu and wait for a selection.
Parameters:
wnd - parent window
offy - vertical offset for menu region
offx - horizontal offset for menu region
width - width of menu region
options - a list of selections
_acquire_wnd - hack to support resize: must be a function callback
that returns new values for wnd, offy, offx, height,
width. Unused if None.
Returns: index into options that was selected
"""
# the currently selected option
selected = 0
while True:
# disable cursor
curses.curs_set(0)
# hack to support resize: recreate the
# parent window every iteration
if _acquire_wnd:
wnd, offy, offx, height, width = _acquire_wnd()
# keypad mode so getch() works with up, down
wnd.keypad(1)
# display the menu
for i in xrange(len(options)):
text, callback = options[i]
text = text + " " * (width - len(text))
# the selected option is displayed in reverse video
if i == selected:
wnd.addstr(i+offy, offx, text, curses.A_REVERSE)
else:
wnd.addstr(i+offy, offx, text)
# display the member
wnd.refresh()
# read one keypress
input = wnd.getch()
# UP moves to the previous option
if input == curses.KEY_UP and selected > 0:
selected = (selected - 1)
# DOWN moves to the next option
elif input == curses.KEY_DOWN and selected < len(options) - 1:
selected = (selected + 1)
# RETURN runs the callback for the selected option
elif input == KEY_RETURN:
text, callback = options[selected]
# highlight the selected option
text = text + " " * (width - len(text))
wnd.addstr(selected+offy, offx, text, curses.A_REVERSE | curses.A_BOLD)
wnd.refresh()
# execute the selected option
if callback(wnd): # success
break
def reset():
"""Reset the terminal and clear the screen."""
reset = curses.tigetstr('rs1')
if not reset: reset = '\x1bc'
curses.putp(reset)

555
py/csc/apps/legacy/main.py Normal file
View File

@ -0,0 +1,555 @@
# $Id: main.py 44 2006-12-31 07:09:27Z mspang $
"""
CEO-like Frontend
This frontend aims to be compatible in both look and function with the
curses UI of CEO.
Some small improvements have been made, such as not echoing passwords and
aborting when nothing is typed into the first input box during an operation.
This frontend is poorly documented, deprecated, and undoubtedly full of bugs.
"""
import curses.ascii, re, os
from helpers import menu, inputbox, msgbox, reset
from csc.admin import accounts, members, terms
# color of the ceo border
BORDER_COLOR = curses.COLOR_RED
def action_new_member(wnd):
"""Interactively add a new member."""
username, studentid, program = '', None, ''
# read the name
prompt = " Name: "
realname = inputbox(wnd, prompt, 18)
# abort if no username is entered
if not realname or realname.lower() == 'exit':
return False
# read the student id
prompt = "Student id:"
while studentid == None or (re.search("[^0-9]", studentid) and not studentid.lower() == 'exit'):
studentid = inputbox(wnd, prompt, 18)
# abort if exit is entered
if studentid.lower() == 'exit':
return False
if studentid == '':
studentid = None
# read the program of study
prompt = " Program:"
program = inputbox(wnd, prompt, 18)
# abort if exit is entered
if program == None or program.lower() == 'exit':
return False
# connect the members module to its backend if necessary
if not members.connected(): members.connect()
# attempt to create the member
try:
memberid = members.new(realname, studentid, program)
msgbox(wnd, "Success! Your memberid is %s. You are now registered\n"
% memberid + "for the " + terms.current() + " term.");
except members.InvalidStudentID:
msgbox(wnd, "Invalid student ID.")
return False
except members.DuplicateStudentID:
msgbox(wnd, "A member with this student ID exists.")
return False
def action_term_register(wnd):
"""Interactively register a member for a term."""
memberid, term = '', ''
# read the member id
prompt = 'Enter memberid ("exit" to cancel):'
memberuserid = inputbox(wnd, prompt, 36)
if not memberuserid or memberuserid.lower() == 'exit':
return False
member = get_member_memberid_userid(wnd, memberuserid)
if not member: return False
memberid = member['memberid']
term_list = members.terms_list(memberid)
# display user
display_member_details(wnd, member, term_list)
# read the term
prompt = "Which term to register for ([fws]20nn):"
while not re.match('^[wsf][0-9]{4}$', term) and not term == 'exit':
term = inputbox(wnd, prompt, 41)
# abort when exit is entered
if term.lower() == 'exit':
return False
# already registered?
if members.registered(memberid, term):
msgbox(wnd, "You are already registered for term " + term)
return False
try:
# attempt to register
members.register(memberid, term)
# display success message [sic]
msgbox(wnd, "Your are now registered for term " + term)
except members.InvalidTerm:
msgbox(wnd, "Term is not valid: %s" % term)
return False
def action_term_register_multiple(wnd):
"""Interactively register a member for multiple terms."""
memberid, base, num = '', '', None
# read the member id
prompt = 'Enter memberid ("exit" to cancel):'
memberuserid = inputbox(wnd, prompt, 36)
if not memberuserid or memberuserid.lower() == 'exit':
return False
member = get_member_memberid_userid(wnd, memberuserid)
if not member: return False
memberid = member['memberid']
term_list = members.terms_list(memberid)
# display user
display_member_details(wnd, member, term_list)
# read the base
prompt = "Which term to start registering ([fws]20nn):"
while not re.match('^[wsf][0-9]{4}$', base) and not base == 'exit':
base = inputbox(wnd, prompt, 41)
# abort when exit is entered
if base.lower() == 'exit':
return False
# read number of terms
prompt = 'How many terms?'
while not num or not re.match('^[0-9]*$', num):
num = inputbox(wnd, prompt, 36)
num = int(num)
# any terms in the range?
if num < 1:
msgbox(wnd, "No terms to register.")
return False
# compile a list to register
term_list = terms.interval(base, num)
# already registered?
for term in term_list:
if members.registered(memberid, term):
msgbox(wnd, "You are already registered for term " + term)
return False
try:
# attempt to register all terms
members.register(memberid, term_list)
# display success message [sic]
msgbox(wnd, "Your are now registered for terms: " + ", ".join(term_list))
except members.InvalidTerm:
msgbox(wnd, "Term is not valid: %s" % term)
return False
def action_create_account(wnd):
"""Interactively create an account for a member."""
memberid, userid = '', ''
# read the member id
prompt = 'Enter member ID (exit to cancel):'
memberid = inputbox(wnd, prompt, 35)
if not memberid or memberid.lower() == 'exit':
return False
member = get_member_memberid_userid(wnd, memberid)
if not member: return False
memberid = member['memberid']
term_list = members.terms_list(memberid)
# display the member
display_member_details(wnd, member, term_list)
# verify member
prompt = "Is this the correct member?"
answer = None
while answer != "yes" and answer != "y" and answer != "n" and answer != "no" and answer != "exit":
answer = inputbox(wnd, prompt, 28)
# user abort
if answer == "exit":
return False
# incorrect member; abort
if answer == "no" or answer == "n":
msgbox(wnd, "I suggest searching for the member by userid or name from the main menu.")
return False
# read user id
prompt = "Userid:"
while userid == '':
userid = inputbox(wnd, prompt, 18)
# user abort
if userid == None or userid.lower() == 'exit':
return False
# member already has an account?
#if member['userid'] != None:
# msgbox(wnd, "Member " + str(memberid) + " already has an account: " + member['userid'] + "\n"
# "Contact the sysadmin if there are still problems." )
# return False
# password input loop
password = "password"
check = "check"
while password != check:
# read password
prompt = "User password:"
password = None
while not password:
password = inputbox(wnd, prompt, 18, False)
# read another password
prompt = "Enter the password again:"
check = None
while not check:
check = inputbox(wnd, prompt, 27, False)
# create the UNIX account
result = accounts.create_account(userid, password, member['name'], memberid)
if result == accounts.LDAP_EXISTS:
msgbox(wnd, "Error: Could not do stuff , Already exists.")
return False
elif result == accounts.KRB_EXISTS:
msgbox(wnd, "This account already exists in Kerberos, but not in LDAP. Please contact the Systems Administrator.")
return False
elif result == accounts.LDAP_NO_IDS:
msgbox(wnd, "There are no available UNIX user ids. This is a fatal error. Contact the Systems Administrator.")
return False
elif result == accounts.BAD_REALNAME:
msgbox(wnd, "Invalid real name: %s. Contact the Systems Administrator." % member['name'])
return False
elif result == accounts.BAD_USERNAME:
msgbox(wnd, "Invalid username: %s. Enter a valid username." % userid)
return False
elif result != accounts.SUCCESS:
raise Exception("Unexpected return status of accounts.create_account(): %s" % result)
# now update the CEO database with the username
members.update( {'memberid':memberid, 'userid': userid} )
# success
msgbox(wnd, "Please run 'addhomedir " + userid + "'.")
msgbox(wnd, "Success! Your account has been added")
def display_member_details(wnd, member, term_list):
"""Display member attributes in a message box."""
# clone and sort term_list
term_list = list(term_list)
term_list.sort( terms.compare )
# labels for data
id_label, studentid_label, name_label = "ID:", "StudentID:", "Name:"
program_label, userid_label, terms_label = "Program:", "User ID:", "Terms:"
# format it all into a massive string
message = "%8s %-20s %10s %-10s (user)\n" % (name_label, member['name'], id_label, member['memberid']) + \
"%8s %-20s %10s %-10s\n" % (program_label, member['program'], studentid_label, member['studentid'])
if member['userid']:
message += "%8s %s\n" % (userid_label, member['userid'])
else:
message += 'No user ID.\n'
message += "%s %s" % (terms_label, " ".join(term_list))
# display the string in a message box
msgbox(wnd, message)
def get_member_memberid_userid(wnd, memberuserid):
"""Retrieve member attributes by member of user id."""
# connect the members module to its backends
if not members.connected(): members.connect()
# retrieve member data
if re.match('^[0-9]*$', memberuserid):
# numeric memberid, look it up
memberid = int(memberuserid)
member = members.get( memberid )
if not member:
msgbox(wnd, '%s is an invalid memberid' % memberuserid)
else:
# non-numeric memberid: try userids
member = members.get_userid( memberuserid )
if not member:
msgbox(wnd, "%s is an invalid account userid" % memberuserid)
return member
def action_display_member(wnd):
"""Interactively display a member."""
# read the member id
prompt = 'Memberid: '
memberid = inputbox(wnd, prompt, 36)
if not memberid or memberid.lower() == 'exit':
return False
member = get_member_memberid_userid(wnd, memberid)
if not member: return
term_list = members.terms_list( member['memberid'] )
# display the details in a window
display_member_details(wnd, member, term_list)
def page(text):
try:
pipe = os.popen('/usr/bin/less', 'w')
pipe.write(text)
pipe.close()
except IOError:
# broken pipe (user didn't read the whole list)
pass
def format_members(member_list):
"""Format a member list into a string."""
# clone and sort member_list
member_list = list(member_list)
member_list.sort( lambda x, y: x['memberid']-y['memberid'] )
buf = ''
for member in member_list:
attrs = ( member['memberid'], member['name'], member['studentid'],
member['type'], member['program'], member['userid'] )
buf += "%4d %50s %10s %10s \n%55s %10s\n\n" % attrs
return buf
def action_list_term(wnd):
"""Interactively list members registered in a term."""
term = None
# read the term
prompt = "Which term to list members for ([fws]20nn): "
while term == None or (not term == '' and not re.match('^[wsf][0-9]{4}$', term) and not term == 'exit'):
term = inputbox(wnd, prompt, 41)
# abort when exit is entered
if not term or term.lower() == 'exit':
return False
# connect the members module to its backends if necessary
if not members.connected(): members.connect()
# retrieve a list of members for term
member_list = members.list_term(term)
# format the data into a mess of text
buf = format_members(member_list)
# display the mass of text with a pager
page( buf )
def action_list_name(wnd):
name = None
# read the name
prompt = "Enter the member's name: "
name = inputbox(wnd, prompt, 41)
# abort when exit is entered
if not name or name.lower() == 'exit':
return False
# connect the members module to its backends if necessary
if not members.connected(): members.connect()
# retrieve a list of members for term
member_list = members.list_name(name)
# format the data into a mess of text
buf = format_members(member_list)
# display the mass of text with a pager
page( buf )
def action_list_studentid(wnd):
studentid = None
# read the studentid
prompt = "Enter the member's student id: "
studentid = inputbox(wnd, prompt, 41)
# abort when exit is entered
if not studentid or studentid.lower() == 'exit':
return False
# connect the members module to its backends if necessary
if not members.connected(): members.connect()
# retrieve a list of members for term
member = members.get_studentid(studentid)
if member != None:
member_list = [ members.get_studentid(studentid) ]
else:
member_list = []
# format the data into a mess of text
buf = format_members(member_list)
# display the mass of text with a pager
page( buf )
def null_callback(wnd):
"""Callback for unimplemented menu options."""
return False
def exit_callback(wnd):
"""Callback for the exit option."""
return True
# the top level ceo menu
top_menu = [
( "New member", action_new_member ),
( "Register for a term", action_term_register ),
( "Register for multiple terms", action_term_register_multiple ),
( "Display a member", action_display_member ),
( "List members registered in a term", action_list_term ),
( "Search for a member by name", action_list_name ),
( "Search for a member by student id", action_list_studentid ),
( "Create an account", action_create_account ),
( "Re Create an account", null_callback ),
( "Library functions", null_callback ),
( "Exit", exit_callback ),
]
def acquire_ceo_wnd(screen=None):
"""Create the top level ceo window."""
# hack to get a reference to the entire screen
# even when the caller doesn't (shouldn't) have one
global _screen
if screen == None:
screen = _screen
else:
_screen = screen
# if the screen changes size, a mess may be left
screen.erase()
# for some reason, the legacy ceo system
# excluded the top line from its window
height, width = screen.getmaxyx()
ceo_wnd = screen.subwin(height-1, width, 1, 0)
# draw the border around the ceo window
curses.init_pair(1, BORDER_COLOR, -1)
color_attr = curses.color_pair(1) | curses.A_BOLD
ceo_wnd.attron(color_attr)
ceo_wnd.border()
ceo_wnd.attroff(color_attr)
# return window and dimensions of inner area
return ceo_wnd, 1, 1, height-2, width-2
def ceo_main_curses(screen):
"""Wrapped main for curses."""
curses.use_default_colors()
# workaround for SSH sessions on virtual consoles (reset terminal)
reset()
# create ceo window
ceo_wnd, menu_y, menu_x, menu_height, menu_width = acquire_ceo_wnd(screen)
# display the top level menu
menu(ceo_wnd, menu_y, menu_x, menu_width, top_menu, acquire_ceo_wnd)
def run():
"""Main function for legacy UI."""
# wrap the entire program using curses.wrapper
# so that the terminal is restored to a sane state
# when the program exits
try:
curses.wrapper(ceo_main_curses)
except KeyboardInterrupt:
pass
except curses.error:
print "Your screen is too small!"
raise
except:
reset()
raise
# clean up screen before exit
reset()
if __name__ == '__main__':
run()

View File

@ -0,0 +1,12 @@
# $Id$
"""
Backends
This module contains backend interfaces and related modules.
CEO's primary backends are:
db.py - CEO's database for member and term registrations
ldapi.py - LDAP, for UNIX account metadata administration
krb.py - Kerberos, for UNIX account password administration
"""

548
py/csc/backend/db.py Normal file
View File

@ -0,0 +1,548 @@
# $Id: db.py 37 2006-12-28 10:00:50Z mspang $
"""
Database Backend Interface
This module is intended to be a thin wrapper around CEO database operations.
Methods on the connection class correspond in a straightforward way to SQL
queries. These methods may restructure and clean up query output but may make
no other assumptions about its content or purpose.
This module makes use of the PygreSQL Python bindings to libpq,
PostgreSQL's native C client library.
"""
import pgdb
class DBException(Exception):
"""Exception class for database-related errors."""
pass
class DBConnection(object):
"""
Connection to CEO's backend database. All database queries
and updates are made via this class.
Exceptions: (all methods)
DBException - on database query failure
Note: Updates will never take place until commit() is called.
Note: In the event that any method of this class raises a
DBException and that exception is caught, rollback()
must be called before further queries will be successful.
Example:
connection = DBConnection()
connection.connect("localhost", "ceo")
# make queries and updates, i.e.
connection.insert_member("Calum T. Dalek")
connection.commit()
connection.disconnect()
"""
def __init__(self):
self.cnx = None
self.cursor = None
def connect(self, hostname=None, database=None, username=None, password=None):
"""
Establishes the connection to CEO's PostgreSQL database.
Parameters:
hostname - hostname:port to connect to
database - name of database
username - user to authenticate as
password - password of username
"""
if self.cnx: raise DBException("unable to connect: already connected")
try:
self.cnx = pgdb.connect(host=hostname, database=database,
user=username, password=password)
self.cursor = self.cnx.cursor()
except pgdb.Error, e:
raise DBException("unable to connect: %s" % e)
def disconnect(self):
"""Closes the connection to CEO's PostgreSQL database."""
if self.cursor:
self.cursor.close()
self.cursor = None
if self.cnx:
self.cnx.close()
self.cnx = None
def connected(self):
"""Determine whether the connection has been established."""
return self.cnx != None
def commit(self):
"""Commits the current transaction and starts a new one."""
self.cnx.commit()
def rollback(self):
"""Aborts the current transaction."""
self.cnx.rollback()
### Member-related methods ###
def select_members(self, sql, params=None):
"""
Retrieves a list CSC members selected by given SQL statement.
This is a helper function that should generally not be called directly.
Parameters:
sql - the SELECT sql statement
params - parameters for the SQL statement
The sql statement must select the six columns
(memberid, name, studentid, program, type, userid)
from the members table in that order.
Returns: a memberid-keyed dictionary whose values are
column-keyed dictionaries with member attributes
"""
# retrieve a list of all members
try:
self.cursor.execute(sql, params)
members_list = self.cursor.fetchall()
except pgdb.Error, e:
raise DBException("SELECT statement failed: %s" % e)
# build a dictionary of dictionaries from the result (a list of lists)
members_dict = {}
for member in members_list:
memberid, name, studentid, program, type, userid = member
members_dict[memberid] = {
'memberid': member[0],
'name': member[1],
'studentid': member[2],
'program': member[3],
'type': member[4],
'userid': member[5],
}
return members_dict
def select_single_member(self, sql, params=None):
"""
Retrieves a single member by memberid.
This is a helper function that should generally not be called directly.
See: self.select_members()
Returns: a column-keyed dictionary with member attributes, or
None if no member matching member exists
"""
# retrieve the member
results = self.select_members(sql, params)
# too many members returned
if len(results) > 1:
raise DBException("multiple members selected: sql='%s' params=%s" % (sql, repr(params)))
# no such member
elif len(results) < 1:
return None
# return the single match
memberid = results.keys()[0]
return results[memberid]
def select_all_members(self):
"""
Retrieves a list of all CSC members (past and present).
See: self.select_members()
Example: connection.select_all_members() -> {
0: { 'memberid': 0, 'name': 'Calum T. Dalek' ...}
3349: { 'memberid': 3349, 'name': 'Michael Spang' ...}
...
}
"""
sql = "SELECT memberid, name, studentid, program, type, userid FROM members"
return self.select_members(sql)
def select_members_by_name(self, name_re):
"""
Retrieves a list of all CSC members whose name matches name_re.
See: self.select_members()
Example: connection.select_members_by_name('Michael') -> {
3349: { 'memberid': 3349, 'name': 'Michael Spang' ...}
...
}
"""
sql = "SELECT memberid, name, studentid, program, type, userid FROM members WHERE name ~* %s"
params = [ str(name_re) ]
return self.select_members(sql, params)
def select_members_by_term(self, term):
"""
Retrieves a list of all CSC members who were members in the specified term.
See: self.select_members()
Example: connection.select_members_by_term('f2006') -> {
3349: { 'memberid': 3349, 'name': 'Michael Spang' ...}
...
}
"""
sql = "SELECT members.memberid, name, studentid, program, type, userid FROM members JOIN terms ON members.memberid=terms.memberid WHERE term=%s"
params = [ str(term) ]
return self.select_members(sql, params)
def select_member_by_id(self, memberid):
"""
Retrieves a single member by memberid.
See: self.select_single_member()
Example: connection.select_member_by_id(0) ->
{ 'memberid': 0, 'name': 'Calum T. Dalek' ...}
"""
sql = "SELECT memberid, name, studentid, program, type, userid FROM members WHERE memberid=%d"
params = [ int(memberid) ]
return self.select_single_member(sql, params)
def select_member_by_account(self, username):
"""
Retrieves a single member by UNIX account username.
See: self.select_single_member()
Example: connection.select_member_by_account('ctdalek') ->
{ 'memberid': 0, 'name': 'Calum T. Dalek' ...}
"""
sql = "SELECT memberid, name, studentid, program, type, userid FROM members WHERE userid=%s"
params = [ username ]
return self.select_single_member(sql, params)
def select_member_by_studentid(self, studentid):
"""
Retrieves a single member by student id number.
See: self.select_single_member()
Example: connection.select_member_by_studentid('nnnnnnnn') ->
{ 'memberid': 3349, 'name': 'Michael Spang' ...}
"""
sql = "SELECT memberid, name, studentid, program, type, userid FROM members WHERE studentid=%s"
params = [ studentid ]
return self.select_single_member(sql, params)
def insert_member(self, name, studentid=None, program=None):
"""
Creates a member with the specified attributes.
Parameters:
name - full name of member
studentid - student id number
program - program of study
Example: connection.insert_member('Michael Spang', '99999999', 'Math/CS') -> 3349
Returns: a memberid of created user
"""
try:
# retrieve the next memberid
sql = "SELECT nextval('memberid_seq')"
self.cursor.execute(sql)
result = self.cursor.fetchone()
memberid = result[0]
# insert the member
sql = "INSERT INTO members (memberid, name, studentid, program, type) VALUES (%d, %s, %s, %s, %s)"
params = [ memberid, name, studentid, program, 'user' ]
self.cursor.execute(sql, params)
return memberid
except pgdb.Error, e:
raise DBException("failed to create member: %s" % e)
def delete_member(self, memberid):
"""
Deletes a member. Note that a member cannot
be deleted until it has been unregistered from
all terms.
Parameters:
memberid - the member id number to delete
Example: connection.delete_member(3349)
"""
sql = "DELETE FROM members WHERE memberid=%d"
params = [ memberid ]
try:
self.cursor.execute(sql, params)
except pgdb.Error, e:
raise DBException("DELETE statement failed: %s" %e)
def update_member(self, member):
"""
Modifies member attributes.
Parameters:
member - a column-keyed dictionary with the new state of the member.
member['memberid'] must be present. ommitted columns
will not be changed. None is NULL.
Returns: the full new state of the member as a column-keyed dictionary
Example: connection.update_member({
'memberid': 3349,
'name': 'Michael C. Spang',
'program': 'CS!'
}) -> {
'memberid': 3349,
'name': 'Michael C. Spang',
'program': CS!',
'studentid': '99999999' # unchanged
}
Equivalent Example:
member = connection.select_member_by_id(3349)
member['name'] = 'Michael C. Spang'
member['program'] = 'CS!'
connection.update_member(member) -> { see above }
"""
try:
# memberid to update
memberid = member['memberid']
# retrieve current state of member
member_state = self.select_member_by_id(memberid)
# build a list of changes to make
changes = []
for column in member.keys():
if member[column] != member_state[column]:
# column's value must be updated
changes.append( (column, member[column]) )
member_state[column] = member[column]
# no changes?
if len(changes) < 1:
return member_state
# make the necessary changes in an update statement
changes = zip(*changes)
sql = "UPDATE members SET " + ", ".join(["%s=%%s"] * len(changes[0])) % changes[0] + " WHERE memberid=%d"
params = changes[1] + ( memberid, )
self.cursor.execute(sql, params)
return member_state
except pgdb.Error, e:
raise DBException("member update failed: %s" % e)
### Term-related methods ###
def select_term(self, memberid, term):
"""
Determines whether a member is registered for a term.
Parameters:
memberid - the member id number
term - the term to check
Returns: a matching term, or None
Example: connection.select_term(3349, 'f2006') -> 'f2006'
"""
sql = "SELECT term FROM terms WHERE memberid=%d AND term=%s"
params = [ memberid, term ]
# retrieve matches
try:
self.cursor.execute(sql, params)
result = self.cursor.fetchall()
except pgdb.Error, e:
raise DBException("SELECT statement failed: %s" % e)
if len(result) > 1:
raise DBException("multiple rows in terms with memberid=%d term=%s" % (memberid, term))
elif len(result) == 0:
return None
else:
return result[0][0]
def select_terms(self, memberid):
"""
Retrieves a list of terms a member is registered for.
Parameters:
memberid - the member id number
Returns: a sorted list of terms
Example: connection.select_terms(3349) -> ['f2006']
"""
sql = "SELECT term FROM terms WHERE memberid=%d"
params = [ memberid ]
# retrieve the list of terms
try:
self.cursor.execute(sql, params)
result = self.cursor.fetchall()
except pgdb.Error, e:
raise DBException("SELECT statement failed: %s" % e)
result = [ row[0] for row in result ]
return result
def insert_term(self, memberid, term):
"""
Registers a member for a term.
Parameters:
memberid - the member id number to register
term - string representation of the term
Example: connection.insert_term(3349, 'f2006')
"""
sql = "INSERT INTO terms (memberid, term) VALUES (%d, %s)"
params = [ memberid, term ]
try:
self.cursor.execute(sql, params)
except pgdb.Error, e:
raise DBException("INSERT statement failed: %s" % e)
def delete_term(self, memberid, term):
"""
Unregisters a member for a term.
Parameters:
memberid - the member id number to register
term - string representation of the term
Example: connection.delete_term(3349, 'f2006')
"""
sql = "DELETE FROM terms WHERE memberid=%d and term=%s"
params = [ memberid, term ]
try:
self.cursor.execute(sql, params)
except pgdb.Error, e:
raise DBException("DELETE statement failed: %s" % e)
def delete_term_all(self, memberid):
"""
Unregisters a member for all registered terms.
Parameters:
memberid - the member id number to unregister
Example: connection.delete_term_all(3349)
"""
sql = "DELETE FROM terms WHERE memberid=%d"
params = [ memberid ]
# retrieve a list of terms
try:
self.cursor.execute(sql, params)
except pgdb.Error, e:
raise DBException("DELETE statement failed: %s" % e)
### Miscellaneous methods ###
def trim_memberid_sequence(self):
"""
Sets the value of the member id sequence to the id of the newest
member. For use after extensive testing to prevent large
intervals of unused memberids.
Note: this does nothing unless the most recently added member(s) have been deleted
"""
self.cursor.execute("SELECT setval('memberid_seq', (SELECT max(memberid) FROM members))")
### Tests ###
if __name__ == '__main__':
HOST = "localhost"
DATABASE = "ceo"
connection = DBConnection()
print "Running disconnect()"
connection.disconnect()
print "Running connect('%s', '%s')" % (HOST, DATABASE)
connection.connect(HOST, DATABASE)
print "Running select_all_members()", "->", len(connection.select_all_members()), "members"
print "Running select_member_by_id(0)", "->", connection.select_member_by_id(0)['userid']
print "Running select_members_by_name('Spang')", "->", connection.select_members_by_name('Spang').keys()
print "Running select_members_by_term('f2006')", "->", "[" + ", ".join(map(str, connection.select_members_by_term('f2006').keys()[0:10])) + " ...]"
print "Running insert_member('test_member', '99999999', 'program')",
memberid = connection.insert_member('test_member', '99999999', 'program')
print "->", memberid
print "Running select_member_by_id(%d)" % memberid, "->", connection.select_member_by_id(memberid)
print "Running insert_term(%d, 'f2006')" % memberid
connection.insert_term(memberid, 'f2006')
print "Running select_terms(%d)" % memberid, "->", connection.select_terms(memberid)
print "Running update_member({'memberid':%d,'name':'test_updated','studentid':-1})" % memberid
connection.update_member({'memberid':memberid,'name':'test_updated','studentid':99999999})
print "Running select_member_by_id(%d)" % memberid, "->", connection.select_member_by_id(memberid)
print "Running rollback()"
connection.rollback()
print "Resetting memberid sequence"
connection.trim_memberid_sequence()
print "Running disconnect()"
connection.disconnect()

222
py/csc/backend/ipc.py Normal file
View File

@ -0,0 +1,222 @@
# $Id: ipc.py 26 2006-12-20 21:25:08Z mspang $
"""
IPC Library Functions
This module contains very UNIX-specific code to allow interactive
communication with another program. For CEO they are required to
talk to kadmin because there is no Kerberos administration Python
module. Real bindings to libkadm5 are doable and thus a TODO.
"""
import os, pty, select
class _pty_file(object):
"""
A 'file'-like wrapper class for pseudoterminal file descriptors.
This wrapper is necessary because Python has a nasty
habit of throwing OSError at pty EOF.
This class also implements timeouts for read operations
which are handy for avoiding deadlock when both
processes are blocked in a read().
See the Python documentation of the file class
for explanation of the methods.
"""
def __init__(self, fd):
self.fd = fd
self.buffer = ''
self.closed = False
def __repr__(self):
status='open'
if self.closed:
status = 'closed'
return "<" + status + " pty '" + os.ttyname(self.fd) + "'>"
def read(self, size=-1, block=True, timeout=0.1):
if self.closed: raise ValueError
if size < 0:
data = None
# read data, catching OSError as EOF
try:
while data != '':
# wait timeout for the pty to become ready, otherwise stop reading
if not block and len(select.select([self.fd],[],[], timeout)[0]) == 0:
break
data = os.read(self.fd, 65536)
self.buffer += data
except OSError:
pass
data = self.buffer
self.buffer = ''
return data
else:
if len(self.buffer) < size:
# read data, catching OSError as EOF
try:
# wait timeout for the pty to become ready, then read
if block or len(select.select([self.fd],[],[], timeout)[0]) != 0:
self.buffer += os.read(self.fd, size - len(self.buffer) )
except OSError:
pass
data = self.buffer[:size]
self.buffer = self.buffer[size:]
return data
def readline(self, size=-1, block=True, timeout=0.1):
data = None
# read data, catching OSError as EOF
try:
while data != '' and self.buffer.find("\n") == -1 and (size < 0 or len(self.buffer) < size):
# wait timeout for the pty to become ready, otherwise stop reading
if not block and len(select.select([self.fd],[],[], timeout)[0]) == 0:
break
data = os.read(self.fd, 128)
self.buffer += data
except OSError:
pass
split_index = self.buffer.find("\n") + 1
if split_index < 0:
split_index = len(self.buffer)
if size >= 0 and split_index > size:
split_index = size
line = self.buffer[:split_index]
self.buffer = self.buffer[split_index:]
return line
def readlines(self, sizehint=None, block=True, timeout=0.1):
lines = []
line = None
while True:
line = self.readline(-1, False, timeout)
if line == '': break
lines.append(line)
return lines
def write(self, data):
if self.closed: raise ValueError
os.write(self.fd, data)
def writelines(self, lines):
for line in lines:
self.write(line)
def __iter__(self):
return self
def next(self):
line = self.readline()
if line == '':
raise StopIteration
return line
def isatty(self):
if self.closed: raise ValueError
return os.isatty(self.fd)
def fileno(self):
if self.closed: raise ValueError
return self.fd
def flush(self):
if self.closed: raise ValueError
os.fsync(self.fd)
def close(self):
if not self.closed: os.close(self.fd)
self.closed = True
def popeni(command, args, env=None):
"""
Open an interactive session with another command.
Parameters:
command - the command to run (full path)
args - a list of arguments to pass to command
env - optional environment for command
Returns: (pid, stdout, stdIn)
"""
# use a pipe to send data to the child
child_stdin, parent_stdin = os.pipe()
# a pipe for receiving data would cause buffering and
# is therefore not suitable for interactive communication
# i.e. parent_stdout, child_stdout = os.pipe()
# therefore a pty must be used instead
master, slave = pty.openpty()
# collect both stdout and stderr on the pty
parent_stdout, child_stdout = master, slave
parent_stderr, child_stderr = master, slave
# fork the child to communicate with
pid = os.fork()
# child process
if pid == 0:
# close all of the parent's fds
os.close(parent_stdin)
if parent_stdout != parent_stdin:
os.close(parent_stdout)
if parent_stderr != parent_stdin and parent_stderr != parent_stdout:
os.close(parent_stderr)
# if stdout is a terminal, set it to the controlling terminal
if os.isatty(child_stdout):
# determine the filename of the tty
tty = os.ttyname(child_stdout)
# create a new session to disconnect
# from the parent's controlling terminal
os.setsid()
# set the controlling terminal to the pty
# by opening it (and closing it again since
# it's already open as child_stdout)
fd = os.open(tty, os.O_RDWR);
os.close(fd)
# init stdin/out/err
os.dup2(child_stdin, 0)
os.dup2(child_stdout, 1)
if child_stderr >= 0:
os.dup2(child_stderr, 2)
# finally, execute the child
if env:
os.execv(command, args, env)
else:
os.execv(command, args)
# parent process
else:
# close all of the child's fds
os.close(child_stdin)
if child_stdout != child_stdin:
os.close(child_stdout)
if child_stderr >= 0 and child_stderr != child_stdin and child_stderr != child_stdout:
os.close(child_stderr)
return pid, _pty_file(parent_stdout), os.fdopen(parent_stdin, 'w')
### Tests ###
if __name__ == '__main__':
import sys
pid, recv, send = popeni('/usr/sbin/kadmin.local', ['kadmin'])
send.write("listprincs\n")
send.flush()
print recv.readlines()

448
py/csc/backend/krb.py Normal file
View File

@ -0,0 +1,448 @@
# $Id: krb.py 40 2006-12-29 00:40:31Z mspang $
"""
Kerberos Backend Interface
This module is intended to be a thin wrapper around Kerberos operations.
Methods on the connection object correspond in a straightforward way to
calls to the Kerberos Master server.
A Kerberos principal is the second half of a CSC UNIX account. The principal
stores the user's password and and is used for all authentication on CSC
systems. Accounts that do not authenticate (e.g. club accounts) do not need
a Kerberos principal.
Unfortunately, there are no Python bindings to libkadm at this time. As a
temporary workaround, This module communicates with the kadmin CLI interface
via a pseudoterminal and pipe.
"""
import os
import ipc
class KrbException(Exception):
"""Exception class for all Kerberos-related errors."""
pass
class KrbConnection(object):
"""
Connection to the Kerberos master server (kadmind). All Kerberos
principal updates are made via this class.
Exceptions: (all methods)
KrbException - on query/update failure
Example:
connection = KrbConnection()
connection.connect(...)
# make queries and updates, e.g.
connection.delete_principal("mspang")
connection.disconnect()
"""
def __init__(self):
self.pid = None
def connect(self, principal, keytab):
"""
Establishes the connection to the Kerberos master server.
Parameters:
principal - the Kerberos princiapl to authenticate as
keytab - keytab filename for authentication
Example: connection.connect('ceo/admin@CSCLUB.UWATERLOO.CA', '/etc/ceo.keytab')
"""
# check keytab
if not os.access(keytab, os.R_OK):
raise KrbException("cannot access Kerberos keytab: %s" % keytab)
# command to run
kadmin = '/usr/sbin/kadmin'
kadmin_args = ['kadmin', '-p', principal, '-kt', keytab]
# fork the kadmin command
self.pid, self.kadm_out, self.kadm_in = ipc.popeni(kadmin, kadmin_args)
# read welcome messages
welcome = self.read_result()
# sanity checks on welcome messages
for line in welcome:
# ignore auth message
if line.find("Authenticating") == 0:
continue
# ignore log file message
elif line.find("kadmin.log") != -1:
continue
# error message?
else:
raise KrbException("unexpected kadmin output: " + welcome[0])
def disconnect(self):
"""Close the connection to the master server."""
if self.pid:
# close the pipe connected to kadmin's standard input
self.kadm_in.close()
# close the master pty connected to kadmin's stdout
try:
self.kadm_out.close()
except OSError:
pass
# wait for kadmin to terminate
os.waitpid(self.pid, 0)
self.pid = None
def connected(self):
"""Determine whether the connection has been established."""
return self.pid != None
### Helper Methods ###
def read_result(self):
"""
Helper function to read output of kadmin until it
prompts for input.
Returns: a list of lines returned by kadmin
"""
# list of lines output by kadmin
result = []
# the kadmin prompt that signals the end output
# note: KADMIN_ARGS[0] must be "kadmin" or the actual prompt will differ
prompt = "kadmin:"
# timeout variables. the timeout will start at timeout and
# increase up to max_timeout when read() returns nothing (i.e., times out)
timeout = 0.01
timeout_increment = 0.10
timeout_maximum = 1.00
# input loop: read from kadmin until the kadmin prompt
buffer = ''
while True:
# attempt to read any available data
data = self.kadm_out.read(block=False, timeout=timeout)
buffer += data
# nothing was read
if data == '':
# so wait longer for data next time
if timeout < timeout_maximum:
timeout += timeout_increment
continue
# give up after too much waiting
else:
# check kadmin status
status = os.waitpid(self.pid, os.WNOHANG)
if status[0] == 0:
# kadmin still alive
raise KrbException("timeout while reading response from kadmin")
else:
# kadmin died!
raise KrbException("kadmin died while reading response")
# break into lines and save all but the final
# line (which is incomplete) into result
lines = buffer.split("\n")
buffer = lines[-1]
lines = lines[:-1]
for line in lines:
line = line.strip()
result.append(line)
# if the incomplete lines in the buffer is the kadmin prompt,
# then the result is complete and may be returned
if buffer.strip() == prompt:
break
return result
def execute(self, command):
"""
Helper function to execute a kadmin command.
Parameters:
command - the command to execute
Returns: a list of lines output by the command
"""
# there should be no remaining output from the previous
# command. if there is then something is broken.
stale_output = self.kadm_out.read(block=False, timeout=0)
if stale_output != '':
raise KrbException("unexpected kadmin output: " + stale_output)
# send the command to kadmin
self.kadm_in.write(command + "\n")
self.kadm_in.flush()
# read the command output and return it
result = self.read_result()
return result
### Commands ###
def list_principals(self):
"""
Retrieve a list of Kerberos principals.
Returns: a list of principals
Example: connection.list_principals() -> [
"ceo/admin@CSCLUB.UWATERLOO.CA",
"sysadmin/admin@CSCLUB.UWATERLOO.CA",
"mspang@CSCLUB.UWATERLOO.CA",
]
"""
principals = self.execute("list_principals")
# assuming that there at least some host principals
if len(principals) < 1:
raise KrbException("no kerberos principals")
# detect error message
if principals[0].find("kadmin:") == 0:
raise KrbException("list_principals returned error: " + principals[0])
# verify principals are well-formed
for principal in principals:
if principal.find("@") == -1:
raise KrbException('malformed pricipal: "' + principal + '"')
return principals
def get_principal(self, principal):
"""
Retrieve principal details.
Returns: a dictionary of principal attributes
Example: connection.get_principal("ceo/admin@CSCLUB.UWATERLOO.CA") -> {
"Principal": "ceo/admin@CSCLUB.UWATERLOO.CA",
"Policy": "[none]",
...
}
"""
output = self.execute('get_principal "' + principal + '"')
# detect error message
if output[0].find("kadmin:") == 0:
raise KrbException("get_principal returned error: " + output[0])
# detect more errors
if output[0].find("get_principal: ") == 0:
message = output[0][15:]
# principal does not exist => None
if message.find("Principal does not exist") == 0:
return None
# dictionary to store attributes
principal_attributes = {}
# attributes that will not be returned
ignore_attributes = ['Key']
# split output into a dictionary of attributes
for line in output:
key, value = line.split(":", 1)
value = value.strip()
if not key in ignore_attributes:
principal_attributes[key] = value
return principal_attributes
def get_privs(self):
"""
Retrieve privileges of the current principal.
Returns: a list of privileges
Example: connection.get_privs() ->
[ "GET", "ADD", "MODIFY", "DELETE" ]
"""
output = self.execute("get_privs")
# one line of output is expected
if len(output) > 1:
raise KrbException("unexpected output of get_privs: " + output[1])
# detect error message
if output[0].find("kadmin:") == 0:
raise KrbException("get_privs returned error: " + output[0])
# parse output by removing the prefix and splitting it around spaces
if output[0][:20] != "current privileges: ":
raise KrbException("malformed get_privs output: " + output[0])
privs = output[0][20:].split(" ")
return privs
def add_principal(self, principal, password):
"""
Create a new principal.
Parameters:
principal - the name of the principal
password - the principal's initial password
Example: connection.add_principal("mspang@CSCLUB.UWATERLOO.CA", "opensesame")
"""
# exec the add_principal command
if password.find('"') == -1:
self.kadm_in.write('add_principal -pw "' + password + '" "' + principal + '"\n')
# fools at MIT didn't bother implementing escaping, so passwords
# that contain double quotes must be treated specially
else:
self.kadm_in.write('add_principal "' + principal + '"\n')
self.kadm_in.write(password + "\n" + password + "\n")
# send request and read response
self.kadm_in.flush()
output = self.read_result()
# verify output
created = False
for line in output:
# ignore NOTICE lines
if line.find("NOTICE:") == 0:
continue
# ignore prompts
elif line.find("Enter password") == 0 or line.find("Re-enter password") == 0:
continue
# record whether success message was encountered
elif line.find("Principal") == 0 and line.find("created.") != 0:
created = True
# error messages
elif line.find("add_principal:") == 0 or line.find("kadmin:") == 0:
# principal exists
if line.find("already exists") != -1:
raise KrbException("principal already exists")
# misc errors
else:
raise KrbException(line)
# unknown output
else:
raise KrbException("unexpected add_principal output: " + line)
# ensure success message was received
if not created:
raise KrbException("did not receive principal created in response")
def delete_principal(self, principal):
"""
Delete a principal.
Parameters:
principal - the principal name
Example: connection.delete_principal("mspang@CSCLUB.UWATERLOO.CA")
"""
# exec the delete_principal command and read response
self.kadm_in.write('delete_principal -force "' + principal + '"\n')
self.kadm_in.flush()
output = self.read_result()
# verify output
deleted = False
for line in output:
# ignore reminder
if line.find("Make sure that") == 0:
continue
# record whether success message was encountered
elif line.find("Principal") == 0 and line.find("deleted.") != -1:
deleted = True
# error messages
elif line.find("delete_principal:") == 0 or line.find("kadmin:") == 0:
# principal exists
if line.find("does not exist") != -1:
raise KrbException("principal does not exist")
# misc errors
else:
raise KrbException(line)
# unknown output
else:
raise KrbException("unexpected delete_principal output: " + line)
# ensure success message was received
if not deleted:
raise KrbException("did not receive principal deleted")
### Tests ###
if __name__ == '__main__':
PRINCIPAL = 'ceo/admin@CSCLUB.UWATERLOO.CA'
KEYTAB = 'ceo.keytab'
connection = KrbConnection()
print "running disconnect()"
connection.disconnect()
print "running connect('%s', '%s')" % (PRINCIPAL, KEYTAB)
connection.connect(PRINCIPAL, KEYTAB)
print "running list_principals()", "->", "[" + ", ".join(map(repr,connection.list_principals()[0:3])) + " ...]"
print "running get_privs()", "->", str(connection.get_privs())
print "running add_principal('testtest', 'BLAH')"
connection.add_principal("testtest", "FJDSLDLFKJSF")
print "running get_principal('testtest')", "->", '(' + connection.get_principal("testtest")['Principal'] + ')'
print "running delete_principal('testtest')"
connection.delete_principal("testtest")
print "running disconnect()"
connection.disconnect()

577
py/csc/backend/ldapi.py Normal file
View File

@ -0,0 +1,577 @@
# $Id: ldapi.py 41 2006-12-29 04:22:31Z mspang $
"""
LDAP Backend Interface
This module is intended to be a thin wrapper around LDAP operations.
Methods on the connection object correspond in a straightforward way
to LDAP queries and updates.
A LDAP entry is the most important component of a CSC UNIX account.
The entry contains the username, user id number, real name, shell,
and other important information. All non-local UNIX accounts must
have an LDAP entry, even if the account does not log in directly.
This module makes use of python-ldap, a Python module with bindings
to libldap, OpenLDAP's native C client library.
"""
import ldap.modlist
class LDAPException(Exception):
"""Exception class for LDAP-related errors."""
class LDAPConnection(object):
"""
Connection to the LDAP directory. All directory
queries and updates are made via this class.
Exceptions: (all methods)
LDAPException - on directory query failure
Example:
connection = LDAPConnection()
connection.connect(...)
# make queries and updates, e.g.
connection.user_delete('mspang')
connection.disconnect()
"""
def __init__(self):
self.ldap = None
def connect(self, server, bind_dn, bind_pw, user_base, group_base):
"""
Establish a connection to the LDAP Server.
Parameters:
server - connection string (e.g. ldap://foo.com, ldaps://bar.com)
bind_dn - distinguished name to bind to
bind_pw - password of bind_dn
user_base - base of the users subtree
group_base - baes of the group subtree
Example: connect('ldaps:///', 'cn=ceo,dc=csclub,dc=uwaterloo,dc=ca',
'secret', 'ou=People,dc=csclub,dc=uwaterloo,dc=ca',
'ou=Group,dc=csclub,dc=uwaterloo,dc=ca')
"""
if bind_pw == None: bind_pw = ''
try:
# open the connection
self.ldap = ldap.initialize(server)
# authenticate as ceo
self.ldap.simple_bind_s(bind_dn, bind_pw)
except ldap.LDAPError, e:
raise LDAPException("unable to connect: %s" % e)
self.user_base = user_base
self.group_base = group_base
def disconnect(self):
"""Close the connection to the LDAP server."""
if self.ldap:
# close connection
try:
self.ldap.unbind_s()
self.ldap = None
except ldap.LDAPError, e:
raise LDAPException("unable to disconnect: %s" % e)
def connected(self):
"""Determine whether the connection has been established."""
return self.ldap != None
### Helper Methods ###
def lookup(self, dn):
"""
Helper method to retrieve the attributes of an entry.
Parameters:
dn - the distinguished name of the directory entry
Returns: a dictionary of attributes of the matched dn, or
None of the dn does not exist in the directory
"""
# search for the specified dn
try:
matches = self.ldap.search_s(dn, ldap.SCOPE_BASE)
except ldap.NO_SUCH_OBJECT:
return None
except ldap.LDAPError, e:
raise LDAPException("unable to lookup dn %s: %s" % (dn, e))
# this should never happen due to the nature of DNs
if len(matches) > 1:
raise LDAPException("duplicate dn in ldap: " + dn)
# return the attributes of the single successful match
else:
match = matches[0]
match_dn, match_attributes = match
return match_attributes
### User-related Methods ###
def user_lookup(self, uid):
"""
Retrieve the attributes of a user.
Parameters:
uid - the UNIX user accound name of the user
Returns: attributes of user with uid
Example: connection.user_lookup('mspang') ->
{ 'uid': 'mspang', 'uidNumber': 21292 ...}
"""
dn = 'uid=' + uid + ',' + self.user_base
return self.lookup(dn)
def user_search(self, filter):
"""
Helper for user searches.
Parameters:
filter - LDAP filter string to match users against
Returns: the list of uids matched
"""
# search for entries that match the filter
try:
matches = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, filter)
except ldap.LDAPError, e:
raise LDAPException("user search failed: %s" % e)
# list for uids found
uids = []
for match in matches:
dn, attributes = match
# uid is a required attribute of posixAccount
if not attributes.has_key('uid'):
raise LDAPException(dn + ' (posixAccount) has no uid')
# do not handle the case of multiple usernames in one entry (yet)
elif len(attributes['uid']) > 1:
raise LDAPException(dn + ' (posixAccount) has multiple uids')
# append the sole uid of this match to the list
uids.append( attributes['uid'][0] )
return uids
def user_search_id(self, uidNumber):
"""
Retrieves a list of users with a certain UNIX uid number.
LDAP (or passwd for that matter) does not enforce any
restriction on the number of accounts that can have
a certain UID. Therefore this method returns a list of matches.
Parameters:
uidNumber - the user id of the accounts desired
Returns: the list of uids matched
Example: connection.user_search_id(21292) -> ['mspang']
"""
# search for posixAccount entries with the specified uidNumber
filter = '(&(objectClass=posixAccount)(uidNumber=%d))' % uidNumber
return self.user_search(filter)
def user_search_gid(self, gidNumber):
"""
Retrieves a list of users with a certain UNIX gid number.
Parameters:
gidNumber - the group id of the accounts desired
Returns: the list of uids matched
"""
# search for posixAccount entries with the specified gidNumber
filter = '(&(objectClass=posixAccount)(gidNumber=%d))' % gidNumber
return self.user_search(filter)
def user_add(self, uid, cn, loginShell, uidNumber, gidNumber, homeDirectory, gecos):
"""
Adds a user to the directory.
Parameters:
uid - the UNIX username for the account
cn - the full name of the member
userPassword - password of the account (our setup does not use this)
loginShell - login shell for the user
uidNumber - the UNIX user id number
gidNumber - the UNIX group id number
homeDirectory - home directory for the user
gecos - comment field (usually stores miscellania)
Example: connection.user_add('mspang', 'Michael Spang',
'/bin/bash', 21292, 100, '/users/mspang',
'Michael Spang,,,')
"""
dn = 'uid=' + uid + ',' + self.user_base
attrs = {
'objectClass': [ 'top', 'account', 'posixAccount', 'shadowAccount' ],
'uid': [ uid ],
'cn': [ cn ],
'loginShell': [ loginShell ],
'uidNumber': [ str(uidNumber) ],
'gidNumber': [ str(gidNumber) ],
'homeDirectory': [ homeDirectory ],
'gecos': [ gecos ],
}
try:
modlist = ldap.modlist.addModlist(attrs)
self.ldap.add_s(dn, modlist)
except ldap.LDAPError, e:
raise LDAPException("unable to add: %s" % e)
def user_modify(self, uid, attrs):
"""
Update user attributes in the directory.
Parameters:
uid - username of the user to modify
entry - dictionary as returned by user_lookup() with changes to make.
omitted attributes are DELETED.
Example: user = user_lookup('mspang')
user['uidNumber'] = [ '0' ]
connection.user_modify('mspang', user)
"""
# distinguished name of the entry to modify
dn = 'uid=' + uid + ',' + self.user_base
# retrieve current state of user
old_user = self.user_lookup(uid)
try:
# build list of modifications to make
changes = ldap.modlist.modifyModlist(old_user, attrs)
# apply changes
self.ldap.modify_s(dn, changes)
except ldap.LDAPError, e:
raise LDAPException("unable to modify: %s" % e)
def user_delete(self, uid):
"""
Removes a user from the directory.
Parameters:
uid - the UNIX username of the account
Example: connection.user_delete('mspang')
"""
try:
dn = 'uid=' + uid + ',' + self.user_base
self.ldap.delete_s(dn)
except ldap.LDAPError, e:
raise LDAPException("unable to delete: %s" % e)
### Group-related Methods ###
def group_lookup(self, cn):
"""
Retrieves the attributes of a group.
Parameters:
cn - the UNIX group name to lookup
Returns: attributes of group with cn
Example: connection.group_lookup('office') -> {
'cn': 'office',
'gidNumber', '1001',
...
}
"""
dn = 'cn=' + cn + ',' + self.group_base
return self.lookup(dn)
def group_search_id(self, gidNumber):
"""
Retrieves a list of groups with the specified UNIX group number.
Parameters:
gidNumber - the group id of the groups desired
Returns: a list of groups with gid gidNumber
Example: connection.group_search_id(1001) -> ['office']
"""
# search for posixAccount entries with the specified uidNumber
try:
filter = '(&(objectClass=posixGroup)(gidNumber=%d))' % gidNumber
matches = self.ldap.search_s(self.group_base, ldap.SCOPE_SUBTREE, filter)
except ldap.LDAPError,e :
raise LDAPException("group search failed: %s" % e)
# list for groups found
group_cns = []
for match in matches:
dn, attributes = match
# cn is a required attribute of posixGroup
if not attributes.has_key('cn'):
raise LDAPException(dn + ' (posixGroup) has no cn')
# do not handle the case of multiple cns for one group (yet)
elif len(attributes['cn']) > 1:
raise LDAPException(dn + ' (posixGroup) has multiple cns')
# append the sole uid of this match to the list
group_cns.append( attributes['cn'][0] )
return group_cns
def group_add(self, cn, gidNumber):
"""
Adds a group to the directory.
Parameters:
cn - the name of the group
gidNumber - the number of the group
Example: connection.group_add('office', 1001)
"""
dn = 'cn=' + cn + ',' + self.group_base
attrs = {
'objectClass': [ 'top', 'posixGroup' ],
'cn': [ cn ],
'gidNumber': [ str(gidNumber) ],
}
try:
modlist = ldap.modlist.addModlist(attrs)
self.ldap.add_s(dn, modlist)
except ldap.LDAPError, e:
raise LDAPException("unable to add group: %s" % e)
def group_modify(self, cn, attrs):
"""
Update group attributes in the directory.
The only available updates are fairly destructive
(rename or renumber) but this method is provided
for completeness.
Parameters:
cn - name of the group to modify
entry - dictionary as returned by group_lookup() with changes to make.
omitted attributes are DELETED.
Example: group = group_lookup('office')
group['gidNumber'] = [ str(connection.first_id(20000, 40000)) ]
del group['memberUid']
connection.group_modify('office', group)
"""
# distinguished name of the entry to modify
dn = 'cn=' + cn + ',' + self.group_base
# retrieve current state of group
old_group = self.group_lookup(cn)
try:
# build list of modifications to make
changes = ldap.modlist.modifyModlist(old_group, attrs)
# apply changes
self.ldap.modify_s(dn, changes)
except ldap.LDAPError, e:
raise LDAPException("unable to modify: %s" % e)
def group_delete(self, cn):
"""
Removes a group from the directory."
Parameters:
cn - the name of the group
Example: connection.group_delete('office')
"""
try:
dn = 'cn=' + cn + ',' + self.group_base
self.ldap.delete_s(dn)
except ldap.LDAPError, e:
raise LDAPException("unable to delete group: %s" % e)
def group_members(self, cn):
"""
Retrieves a group's members.
Parameters:
cn - the name of the group
Example: connection.group_members('office') ->
['sfflaw', 'jeperry', 'cschopf' ...]
"""
group = self.group_lookup(cn)
return group.get('memberUid', None)
### Miscellaneous Methods ###
def first_id(self, minimum, maximum):
"""
Determines the first available id within a range.
To be "available", there must be neither a user
with the id nor a group with the id.
Parameters:
minimum - smallest uid that may be returned
maximum - largest uid that may be returned
Returns: the id, or None if there are none available
Example: connection.first_id(20000, 40000) -> 20018
"""
# compile a list of used uids
try:
users = self.ldap.search_s(self.user_base, ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['uidNumber'])
except ldap.LDAPError, e:
raise LDAPException("search for uids failed: %s" % e)
uids = []
for user in users:
dn, attrs = user
uid = int(attrs['uidNumber'][0])
if minimum <= uid <= maximum:
uids.append(uid)
# compile a list of used gids
try:
groups = self.ldap.search_s(self.group_base, ldap.SCOPE_SUBTREE, '(objectClass=posixGroup)', ['gidNumber'])
except ldap.LDAPError, e:
raise LDAPException("search for gids failed: %s" % e)
gids = []
for group in groups:
dn, attrs = group
gid = int(attrs['gidNumber'][0])
if minimum <= gid <= maximum:
gids.append(gid)
# iterate through ids and return the first available
for id in xrange(minimum, maximum+1):
if not id in uids and not id in gids:
return id
# no suitable id was found
return None
### Tests ###
if __name__ == '__main__':
password_file = 'ldap.ceo'
server = 'ldaps:///'
base_dn = 'dc=csclub,dc=uwaterloo,dc=ca'
bind_dn = 'cn=ceo,' + base_dn
user_dn = 'ou=People,' + base_dn
group_dn = 'ou=Group,' + base_dn
bind_pw = open(password_file).readline().strip()
connection = LDAPConnection()
print "running disconnect()"
connection.disconnect()
print "running connect('%s', '%s', '%s', '%s', '%s')" % (server, bind_dn, '***', user_dn, group_dn)
connection.connect(server, bind_dn, bind_pw, user_dn, group_dn)
print "running user_lookup('mspang')", "->", "(%s)" % connection.user_lookup('mspang')['uidNumber'][0]
print "running user_search_id(21292)", "->", connection.user_search_id(21292)
print "running first_id(20000, 40000)", "->",
first_id = connection.first_id(20000, 40000)
print first_id
print "running group_add('testgroup', %d)" % first_id
try:
connection.group_add('testgroup', first_id)
except Exception, e:
print "FAILED: %s (continuing)" % e
print "running user_add('testuser', 'Test User', '/bin/false', %d, %d, '/home/null', 'Test User,,,')" % (first_id, first_id)
try:
connection.user_add('testuser', 'Test User', '/bin/false', first_id, first_id, '/home/null', 'Test User,,,')
except Exception, e:
print "FAILED: %s (continuing)" % e
print "running user_lookup('testuser')", "->",
user = connection.user_lookup('testuser')
print repr(connection.user_lookup('testuser')['cn'][0])
user['homeDirectory'] = ['/home/changed']
user['loginShell'] = ['/bin/true']
print "running user_modify(...)"
connection.user_modify('testuser', user)
print "running user_lookup('testuser')", "->",
user = connection.user_lookup('testuser')
print '(%s, %s)' % (user['homeDirectory'], user['loginShell'])
print "running group_lookup('testgroup')", "->",
group = connection.group_lookup('testgroup')
print group
print "running group_modify(...)"
group['gidNumber'] = [str(connection.first_id(20000, 40000))]
group['memberUid'] = [ str(first_id) ]
connection.group_modify('testgroup', group)
print "running group_lookup('testgroup')", "->",
group = connection.group_lookup('testgroup')
print group
print "running user_delete('testuser')"
connection.user_delete('testuser')
print "running group_delete('testgroup')"
connection.group_delete('testgroup')
print "running user_search_gid(100)", "->", "[" + ", ".join(map(repr,connection.user_search_gid(100)[:10])) + " ...]"
print "running group_members('office')", "->", "[" + ", ".join(map(repr,connection.group_members('office')[:10])) + " ...]"
print "running disconnect()"
connection.disconnect()

46
py/csc/lib/__init__.py Normal file
View File

@ -0,0 +1,46 @@
"""Library Routines"""
def read_config(config_file):
try:
conffile = open(config_file)
except IOError:
return None
options = {}
while True:
line = conffile.readline()
if line == '':
break
if '#' in line:
line = line[:line.find('#')]
while len(line) > 1 and line[-2] == '\\':
line = line[:-2] + line[-1]
next = conffile.readline()
line += next
if next == '':
break
pair = map(str.strip, line.split('=', 1))
if len(pair) == 2:
key, val = pair
if val[0] == val[-1] == '"':
val = val[1:-1]
else:
try:
val = int(val)
except:
pass
options[key] = val
elif len(pair[0]) > 1:
key, = pair
options[key] = None
return options

14
sql/initialize.sh Executable file
View File

@ -0,0 +1,14 @@
#!/bin/sh
# $Id: initialize.sh 13 2006-12-15 03:57:00Z mspang $
# Initializes a database for CEO.
# initialize the database
createdb $1
createlang plpythonu $1
# initialize the tables
psql $1 < structure.sql
# initialize check triggers
psql $1 < verify_studentid.sql
psql $1 < verify_term.sql

25
sql/structure.sql Normal file
View File

@ -0,0 +1,25 @@
-- $Id: structure.sql 36 2006-12-28 10:00:11Z mspang $
-- Table structure for CEO's SQL database.
-- Usage:
-- su postgres
-- createdb ceo
-- psql ceo < structure.sql
CREATE SEQUENCE memberid_seq;
CREATE TABLE members (
memberid integer PRIMARY KEY DEFAULT nextval('memberid_seq') NOT NULL,
name character varying(50) NOT NULL,
studentid character varying(10) UNIQUE,
program character varying(50),
"type" character varying(10),
userid character varying(32) UNIQUE
);
CREATE TABLE terms (
memberid integer NOT NULL,
term character(5) NOT NULL,
UNIQUE(memberid, term)
);

31
sql/verify_studentid.sql Normal file
View File

@ -0,0 +1,31 @@
-- $Id: verify_studentid.sql 7 2006-12-11 06:27:22Z mspang $
-- PL/Python trigger to verify student ids for validity
-- Dedicated to office staff who can't type student ids.
-- To (re)install:
-- su postgres
-- psql ceo < verify_studentid.sql
-- To uninstall:
-- su postgres
-- echo 'DROP FUNCTION verify_studentid() CASCADE' | psql ceo
DROP FUNCTION verify_studentid() CASCADE;
CREATE FUNCTION verify_studentid() RETURNS trigger AS '
import re
# update this line if the student id format changes
STUDENTID_REGEX = "^[0-9]{8}$"
studentid = TD["new"]["studentid"]
if studentid and not re.match(STUDENTID_REGEX, studentid):
plpy.error("student id is invalid (%s)" % studentid)
' LANGUAGE plpythonu;
CREATE TRIGGER verify_studentid_insert BEFORE INSERT on members
FOR ROW EXECUTE PROCEDURE verify_studentid();
CREATE TRIGGER verify_studentid_update BEFORE UPDATE ON members
FOR ROW EXECUTE PROCEDURE verify_studentid();

30
sql/verify_term.sql Normal file
View File

@ -0,0 +1,30 @@
-- $Id$
-- PL/Python trigger to verify terms for validity
-- To (re)install:
-- su postgres
-- psql ceo < verify_term.sql
-- To uninstall:
-- su postgres
-- echo 'DROP FUNCTION verify_term() CASCADE' | psql ceo
DROP FUNCTION verify_term() CASCADE;
CREATE FUNCTION verify_term() RETURNS trigger AS '
import re
# update this line if the term format changes
TERM_REGEX = "^[wsf][0-9]{4}$"
term = TD["new"]["term"]
if term and not re.match(TERM_REGEX, term):
plpy.error("term is invalid (%s)" % term)
' LANGUAGE plpythonu;
CREATE TRIGGER verify_term_insert BEFORE INSERT on terms
FOR ROW EXECUTE PROCEDURE verify_term();
CREATE TRIGGER verify_term_update BEFORE UPDATE ON terms
FOR ROW EXECUTE PROCEDURE verify_term();