add base classes for users and groups

This commit is contained in:
Max Erenberg 2021-07-19 05:47:39 +00:00
parent 0c6dc18085
commit de0f473881
49 changed files with 662 additions and 3293 deletions

1
ceo/.gitignore vendored
View File

@ -1 +0,0 @@
/ceo_pb2.py

View File

@ -1 +0,0 @@
"""CSC Electronic Office"""

View File

@ -1,162 +0,0 @@
"""
Configuration Utility Module
This module contains functions to load and verify very simple configuration
files. Python supports ".ini" files, which suck, so this module is used
instead.
Example Configuration File:
include /path/to/other.cf
# these values are the same:
name_protected = "Michael Spang"
name_unprotected = Michael Spang
# these values are not the same:
yes_no = " yes"
no_yes = yes
# this value is an integer
arbitrary_number=2
# this value is not an integer
arbitrary_string="2"
# this is a key with no value
csclub
# this key contains whitespace
white space = sure, why not
# these two lines are treated as one
long line = first line \\
second line
Resultant Dictionary:
{
'name_protected': 'Michael Spang',
'name_unprotected:' 'Michael Spang',
'yes_no': ' yes',
'no_yes': 'yes',
'arbirary_number': 2,
'arbitrary_string': '2',
'csclub': None,
'white space': 'sure, why not'
'long line': 'first line \\n second line'
... (data from other.cf) ...
}
"""
from curses.ascii import isspace
class ConfigurationException(Exception):
"""Exception class for incomplete and incorrect configurations."""
def read(filename, included=None):
"""
Function to read a configuration file into a dictionary.
Parmaeters:
filename - the file to read
included - files previously read (internal)
Exceptions:
IOError - when the configuration file cannot be read
"""
if not included:
included = []
if filename in included:
return {}
included.append(filename)
conffile = open(filename)
options = {}
while True:
line = conffile.readline()
if line == '':
break
# remove comments
if '#' in line:
line = line[:line.find('#')]
# combine lines when the newline is escaped with \
while len(line) > 1 and line[-2] == '\\':
line = line[:-2] + line[-1]
next = conffile.readline()
line += next
if next == '':
break
line = line.strip()
# process include statements
if line.find("include") == 0 and isspace(line[7]):
filename = line[8:].strip()
options.update(read(filename, included))
continue
# split 'key = value' into key and value and strip results
pair = map(str.strip, line.split('=', 1))
# found key and value
if len(pair) == 2:
key, val = pair
# found quoted string?
if val and val[0] == val[-1] == '"':
val = val[1:-1]
# unquoted, found num?
elif val:
try:
if "." in val:
val = float(val)
elif val[0] == '0':
val = int(val, 8)
else:
val = int(val)
except ValueError:
pass
# save key and value
options[key] = val
# found only key, value = None
elif len(pair[0]) > 1:
key = pair[0]
options[key] = None
return options
def check_string_fields(filename, field_list, cfg):
"""Function to verify thatfields are strings."""
for field in field_list:
if field not in cfg or type(cfg[field]) is not str:
raise ConfigurationException('expected string value for option "%s" in "%s"' % (field, filename))
def check_integer_fields(filename, field_list, cfg):
"""Function to verify that fields are integers."""
for field in field_list:
if field not in cfg or type(cfg[field]) not in (int, long):
raise ConfigurationException('expected numeric value for option "%s" in "%s"' % (field, filename))
def check_float_fields(filename, field_list, cfg):
"""Function to verify that fields are integers or floats."""
for field in field_list:
if field not in cfg or type(cfg[field]) not in (float, long, int):
raise ConfigurationException('expected float value for option "%s" in "%s"' % (field, filename))

View File

@ -1 +0,0 @@
"""Console Interface"""

View File

@ -1,40 +0,0 @@
import sys, ldap
from ceo import members, uwldap, terms, ldapi
def max_term(term1, term2):
if terms.compare(term1, term2) > 0:
return term1
else:
return term2
class ExpiredAccounts:
help = '''
expiredaccounts [--email]
Displays a list of expired accounts. If --email is specified, expired account
owners will be emailed.
'''
def main(self, args):
send_email = False
if len(args) == 1 and args[0] == '--email':
sys.stderr.write("If you want to send an account expiration notice to " \
"these users then type 'Yes, do this' and hit enter\n")
if raw_input() == 'Yes, do this':
send_email = True
uwl = ldap.initialize(uwldap.uri())
mlist = members.expired_accounts()
for member in mlist.values():
term = "f0000"
term = reduce(max_term, member.get("term", []), term)
term = reduce(max_term, member.get("nonMemberTerm", []), term)
expiredfor = terms.delta(term, terms.current())
if expiredfor <= 3:
uid = member['uid'][0]
name = member['cn'][0]
email = None
print '%s (expired for %d terms)' % (uid.ljust(12), expiredfor)
if send_email:
print " sending mail to %s" % uid
members.send_account_expired_email(name, uid)

View File

@ -1,27 +0,0 @@
from ceo import members, terms
def max_term(term1, term2):
if terms.compare(term1, term2) > 0:
return term1
else:
return term2
class Inactive:
help = '''
inactive delta-terms
Prints a list of accounts that have been inactive (i.e. unpaid) for
delta-terms.
'''
def main(self, args):
if len(args) != 1:
print self.help
return
delta = int(args[0])
mlist = members.list_all()
for member in mlist.values():
term = "f0000"
term = reduce(max_term, member.get("term", []), term)
term = reduce(max_term, member.get("nonMemberTerm", []), term)
if terms.delta(term, terms.current()) >= delta:
print "%s %s" % (member['uid'][0].ljust(12), term)

View File

@ -1,49 +0,0 @@
import sys, ldap, termios
from ceo import members, terms, uwldap, ldapi
from ceo.console.memberlist import MemberList
from ceo.console.updateprograms import UpdatePrograms
from ceo.console.expiredaccounts import ExpiredAccounts
from ceo.console.inactive import Inactive
from ceo.console.mysql import MySQL
commands = {
'memberlist' : MemberList(),
'updateprograms' : UpdatePrograms(),
'expiredaccounts' : ExpiredAccounts(),
'inactive': Inactive(),
'mysql': MySQL(),
}
help_opts = [ '--help', '-h' ]
def start():
args = sys.argv[1:]
if args[0] in help_opts:
help()
elif args[0] in commands:
command = commands[args[0]]
if len(args) >= 2 and args[1] in help_opts:
print command.help
else:
command.main(args[1:])
else:
print "Invalid command '%s'" % args[0]
def help():
args = sys.argv[2:]
if len(args) == 1:
if args[0] in commands:
print commands[args[0]].help
else:
print 'Unknown command %s.' % args[0]
else:
print ''
print 'To run the ceo GUI, type \'ceo\''
print ''
print 'To run a ceo console command, type \'ceo command\''
print ''
print 'Available console commands:'
for c in commands:
print ' %s' % c
print ''
print 'Run \'ceo command --help\' for help on a specific command.'
print ''

View File

@ -1,24 +0,0 @@
from ceo import members, terms
class MemberList:
help = '''
memberlist [term]
Displays a list of members for a term; defaults to the current term if term
is not given.
'''
def main(self, args):
mlist = {}
if len(args) == 1:
mlist = members.list_term(args[0])
else:
mlist = members.list_term(terms.current())
dns = mlist.keys()
dns.sort()
for dn in dns:
member = mlist[dn]
print '%s %s %s' % (
member['uid'][0].ljust(12),
member['cn'][0].ljust(30),
member.get('program', [''])[0]
)

View File

@ -1,38 +0,0 @@
from ceo import members, terms, mysql
class MySQL:
help = '''
mysql create <username>
Creates a mysql database for a user.
'''
def main(self, args):
if len(args) != 2 or args[0] != 'create':
print self.help
return
username = args[1]
problem = None
try:
password = mysql.create_mysql(username)
try:
mysql.write_mysql_info(username, password)
helpfiletext = "Settings written to ~%s/ceo-mysql-info." % username
except (KeyError, IOError, OSError), e:
helpfiletext = "An error occured writing the settings file: %s" % e
print "MySQL database created"
print ("Connection Information: \n"
"\n"
"Database: %s\n"
"Username: %s\n"
"Hostname: localhost\n"
"Password: %s\n"
"\n"
"%s\n"
% (username, username, password, helpfiletext))
except mysql.MySQLException, e:
print "Failed to create MySQL database"
print
print "We failed to create the database. The error was:\n\n%s" % e

View File

@ -1,49 +0,0 @@
import ldap, sys, termios
from ceo import members, uwldap, ldapi
blacklist = ('orphaned', 'expired')
class UpdatePrograms:
help = '''
updateprograms
Interactively updates the program field for an account by querying uwdir.
'''
def main(self, args):
mlist = members.list_all().items()
uwl = ldap.initialize(uwldap.uri())
fd = sys.stdin.fileno()
for (dn, member) in mlist:
uid = member['uid'][0]
user = uwl.search_s(uwldap.base(), ldap.SCOPE_SUBTREE,
'(uid=%s)' % ldapi.escape(uid))
if len(user) == 0:
continue
user = user[0][1]
oldprog = member.get('program', [''])[0]
newprog = user.get('ou', [''])[0]
if oldprog == newprog or newprog == '' or newprog.lower() in blacklist:
continue
sys.stdout.write("%s: '%s' => '%s'? (y/n) " % (uid, oldprog, newprog))
new = old = termios.tcgetattr(fd)
new[3] = new[3] & ~termios.ICANON
try:
termios.tcsetattr(fd, termios.TCSANOW, new)
try:
if sys.stdin.read(1) != 'y':
continue
except KeyboardInterrupt:
return ''
finally:
print ''
termios.tcsetattr(fd, termios.TCSANOW, old)
old = new = {}
if oldprog != '':
old = {'program': [oldprog]}
if newprog != '':
new = {'program': [newprog]}
mlist = ldapi.make_modlist(old, new)
# TODO: don't use members.ld directly
#if newprog != '':
# members.set_program(uid, newprog)
members.ld.modify_s(dn, mlist)

View File

@ -1,13 +0,0 @@
"""
Exceptions Module
This module provides some simple but generally useful exception classes.
"""
class InvalidArgument(Exception):
"""Exception class for bad argument values."""
def __init__(self, argname, argval, explanation):
Exception.__init__(self)
self.argname, self.argval, self.explanation = argname, argval, explanation
def __str__(self):
return 'Bad argument value "%s" for %s: %s' % (self.argval, self.argname, self.explanation)

View File

@ -1,148 +0,0 @@
"""
LDAP Utilities
This module makes use of python-ldap, a Python module with bindings
to libldap, OpenLDAP's native C client library.
"""
import ldap.modlist, os, pwd
from subprocess import Popen, PIPE
def connect_sasl(uri, mech, realm, password):
try:
# open the connection
ld = ldap.initialize(uri)
# authenticate
sasl = Sasl(mech, realm, password)
ld.sasl_interactive_bind_s('', sasl)
except ldap.LOCAL_ERROR, e:
raise e
except:
print "Shit, something went wrong!"
return ld
def abslookup(ld, dn, objectclass=None):
# search for the specified dn
try:
if objectclass:
search_filter = '(objectclass=%s)' % escape(objectclass)
matches = ld.search_s(dn, ldap.SCOPE_BASE, search_filter)
else:
matches = ld.search_s(dn, ldap.SCOPE_BASE)
except ldap.NO_SUCH_OBJECT:
return None
# dn was found, but didn't match the objectclass filter
if len(matches) < 1:
return None
# return the attributes of the single successful match
match = matches[0]
match_dn, match_attributes = match
return match_attributes
def lookup(ld, rdntype, rdnval, base, objectclass=None):
dn = '%s=%s,%s' % (rdntype, escape(rdnval), base)
return abslookup(ld, dn, objectclass)
def search(ld, base, search_filter, params=[], scope=ldap.SCOPE_SUBTREE, attrlist=None, attrsonly=0):
real_filter = search_filter % tuple(escape(x) for x in params)
# search for entries that match the filter
matches = ld.search_s(base, scope, real_filter, attrlist, attrsonly)
return matches
def modify(ld, rdntype, rdnval, base, mlist):
dn = '%s=%s,%s' % (rdntype, escape(rdnval), base)
ld.modify_s(dn, mlist)
def modify_attrs(ld, rdntype, rdnval, base, old, attrs):
dn = '%s=%s,%s' % (rdntype, escape(rdnval), base)
# build list of modifications to make
changes = ldap.modlist.modifyModlist(old, attrs)
# apply changes
ld.modify_s(dn, changes)
def modify_diff(ld, rdntype, rdnval, base, old, new):
dn = '%s=%s,%s' % (rdntype, escape(rdnval), base)
# build list of modifications to make
changes = make_modlist(old, new)
# apply changes
ld.modify_s(dn, changes)
def escape(value):
"""
Escapes special characters in a value so that it may be safely inserted
into an LDAP search filter.
"""
value = str(value)
value = value.replace('\\', '\\5c').replace('*', '\\2a')
value = value.replace('(', '\\28').replace(')', '\\29')
value = value.replace('\x00', '\\00')
return value
def make_modlist(old, new):
keys = set(old.keys()).union(set(new))
mlist = []
for key in keys:
if key in old and not key in new:
mlist.append((ldap.MOD_DELETE, key, list(set(old[key]))))
elif key in new and not key in old:
mlist.append((ldap.MOD_ADD, key, list(set(new[key]))))
else:
to_add = list(set(new[key]) - set(old[key]))
if len(to_add) > 0:
mlist.append((ldap.MOD_ADD, key, to_add))
to_del = list(set(old[key]) - set(new[key]))
if len(to_del) > 0:
mlist.append((ldap.MOD_DELETE, key, to_del))
return mlist
def format_ldaperror(ex):
desc = ex[0].get('desc', '')
info = ex[0].get('info', '')
if desc and info:
return "%s: %s" % (desc, info)
elif desc:
return desc
else:
return str(ex)
class Sasl:
def __init__(self, mech, realm, password):
self.mech = mech
self.realm = realm
if mech == 'GSSAPI' and password is not None:
userid = pwd.getpwuid(os.getuid()).pw_name
kinit = '/usr/bin/kinit'
kinit_args = [ kinit, '%s@%s' % (userid, realm) ]
kinit = Popen(kinit_args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
kinit.stdin.write('%s\n' % password)
kinit.wait()
def callback(self, id, challenge, prompt, defresult):
return ''

View File

@ -1,609 +0,0 @@
"""
CSC Member Management
This module contains functions for registering new members, registering
members for terms, searching for members, and other member-related
functions.
Transactions are used in each method that modifies the database.
Future changes to the members database that need to be atomic
must also be moved into this module.
"""
import os, re, subprocess, ldap, socket
from ceo import conf, ldapi, terms, remote, ceo_pb2
from ceo.excep import InvalidArgument
import dns.resolver
### Configuration ###
CONFIG_FILE = '/etc/csc/accounts.cf'
cfg = {}
def configure():
"""Load Members Configuration"""
string_fields = [ 'username_regex', 'shells_file', 'ldap_server_url',
'ldap_users_base', 'ldap_groups_base', 'ldap_sasl_mech', 'ldap_sasl_realm',
'expire_hook' ]
numeric_fields = [ 'min_password_length' ]
# read configuration file
cfg_tmp = conf.read(CONFIG_FILE)
# verify configuration
conf.check_string_fields(CONFIG_FILE, string_fields, cfg_tmp)
conf.check_integer_fields(CONFIG_FILE, numeric_fields, cfg_tmp)
# update the current configuration with the loaded values
cfg.update(cfg_tmp)
### Exceptions ###
class MemberException(Exception):
"""Base exception class for member-related errors."""
def __init__(self, ex=None):
Exception.__init__(self)
self.ex = ex
def __str__(self):
return str(self.ex)
class InvalidTerm(MemberException):
"""Exception class for malformed terms."""
def __init__(self, term):
MemberException.__init__(self)
self.term = term
def __str__(self):
return "Term is invalid: %s" % self.term
class NoSuchMember(MemberException):
"""Exception class for nonexistent members."""
def __init__(self, memberid):
MemberException.__init__(self)
self.memberid = memberid
def __str__(self):
return "Member not found: %d" % self.memberid
### Connection Management ###
# global directory connection
ld = None
def connect(auth_callback):
"""Connect to LDAP."""
global ld
password = None
tries = 0
while ld is None:
try:
ld = ldapi.connect_sasl(cfg['ldap_server_url'], cfg['ldap_sasl_mech'],
cfg['ldap_sasl_realm'], password)
except ldap.LOCAL_ERROR, e:
tries += 1
if tries > 3:
raise e
password = auth_callback.callback(e)
if password == None:
raise e
def connect_anonymous():
"""Connect to LDAP."""
global ld
ld = ldap.initialize(cfg['ldap_server_url'])
def disconnect():
"""Disconnect from LDAP."""
global ld
ld.unbind_s()
ld = None
def connected():
"""Determine whether the connection has been established."""
return ld and ld.connected()
### Members ###
def create_member(username, password, name, program, email, club_rep=False):
"""
Creates a UNIX user account with options tailored to CSC members.
Parameters:
username - the desired UNIX username
password - the desired UNIX password
name - the member's real name
program - the member's program of study
club_rep - whether the user is a club rep
email - email to place in .forward
Exceptions:
InvalidArgument - on bad account attributes provided
Returns: the uid number of the new account
See: create()
"""
# check username format
if not username or not re.match(cfg['username_regex'], username):
raise InvalidArgument("username", username, "expected format %s" % repr(cfg['username_regex']))
# check password length
if not password or len(password) < cfg['min_password_length']:
raise InvalidArgument("password", "<hidden>", "too short (minimum %d characters)" % cfg['min_password_length'])
try:
request = ceo_pb2.AddUser()
request.username = username
request.password = password
request.realname = name
request.program = program
request.email = email
if club_rep:
request.type = ceo_pb2.AddUser.CLUB_REP
else:
request.type = ceo_pb2.AddUser.MEMBER
out = remote.run_remote('adduser', request.SerializeToString())
response = ceo_pb2.AddUserResponse()
response.ParseFromString(out)
if any(message.status != 0 for message in response.messages):
raise MemberException('\n'.join(message.message for message in response.messages))
except remote.RemoteException, e:
raise MemberException(e)
except OSError, e:
raise MemberException(e)
def check_email(email):
match = re.match('^\S+?@(\S+)$', email)
if not match:
return 'Invalid email address'
# some characters are treated specially in .forward
for c in email:
if c in ('"', "'", ',', '|', '$', '/', '#', ':'):
return 'Invalid character in address: %s' % c
# Start by searching for host record
host = match.group(1)
try:
ip = socket.getaddrinfo(host, None)
except:
# Check for MX record
try:
dns.resolver.query(host, 'MX')
except:
return 'Invalid host: %s' % host
def current_email(username):
fwdpath = '%s/%s/.forward' % (cfg['member_home'], username)
try:
fwd = open(fwdpath).read().strip()
if not check_email(fwd):
return fwd
except OSError:
pass
except IOError:
pass
def change_email(username, forward):
try:
request = ceo_pb2.UpdateMail()
request.username = username
request.forward = forward
out = remote.run_remote('mail', request.SerializeToString())
response = ceo_pb2.AddUserResponse()
response.ParseFromString(out)
if any(message.status != 0 for message in response.messages):
return '\n'.join(message.message for message in response.messages)
except remote.RemoteException, e:
raise MemberException(e)
except OSError, e:
raise MemberException(e)
def get(userid):
"""
Look up attributes of a member by userid.
Returns: a dictionary of attributes
Example: get('mspang') -> {
'cn': [ 'Michael Spang' ],
'program': [ 'Computer Science' ],
...
}
"""
return ldapi.lookup(ld, 'uid', userid, cfg['ldap_users_base'])
def get_group(group):
"""
Look up group by groupname
Returns a dictionary of group attributes
"""
return ldapi.lookup(ld, 'cn', group, cfg['ldap_groups_base'])
def uid2dn(uid):
return 'uid=%s,%s' % (ldapi.escape(uid), cfg['ldap_users_base'])
def list_term(term):
"""
Build a list of members in a term.
Parameters:
term - the term to match members against
Returns: a list of members
Example: list_term('f2006'): -> {
'uid=mspang, ou=...': { 'cn': 'Michael Spang', ... },
'uid=ctdalek, ou=...': { 'cn': 'Calum T. Dalek', ... },
...
}
"""
members = ldapi.search(ld, cfg['ldap_users_base'],
'(&(objectClass=member)(term=%s))', [ term ])
return dict([(member[0], member[1]) for member in members])
def list_name(name):
"""
Build a list of members with matching names.
Parameters:
name - the name to match members against
Returns: a list of member dictionaries
Example: list_name('Spang'): -> {
'uid=mspang, ou=...': { 'cn': 'Michael Spang', ... },
...
]
"""
members = ldapi.search(ld, cfg['ldap_users_base'],
'(&(objectClass=member)(cn~=%s))', [ name ])
return dict([(member[0], member[1]) for member in members])
def list_group(group):
"""
Build a list of members in a group.
Parameters:
group - the group to match members against
Returns: a list of member dictionaries
Example: list_name('syscom'): -> {
'uid=mspang, ou=...': { 'cn': 'Michael Spang', ... },
...
]
"""
members = group_members(group)
ret = {}
if members:
for member in members:
info = get(member)
if info:
ret[uid2dn(member)] = info
return ret
def list_all():
"""
Build a list of all members
Returns: a list of member dictionaries
Example: list_name('Spang'): -> {
'uid=mspang, ou=...': { 'cn': 'Michael Spang', ... },
...
]
"""
members = ldapi.search(ld, cfg['ldap_users_base'], '(objectClass=member)')
return dict([(member[0], member[1]) for member in members])
def list_positions():
"""
Build a list of positions
Returns: a list of positions and who holds them
Example: list_positions(): -> {
'president': { 'mspang': { 'cn': 'Michael Spang', ... } } ],
...
]
"""
members = ld.search_s(cfg['ldap_users_base'], ldap.SCOPE_SUBTREE, '(position=*)')
positions = {}
for (_, member) in members:
for position in member['position']:
if not position in positions:
positions[position] = {}
positions[position][member['uid'][0]] = member
return positions
def set_position(position, members):
"""
Sets a position
Parameters:
position - the position to set
members - an array of members that hold the position
Example: set_position('president', ['dtbartle'])
"""
res = ld.search_s(cfg['ldap_users_base'], ldap.SCOPE_SUBTREE,
'(&(objectClass=member)(position=%s))' % ldapi.escape(position))
old = set([ member['uid'][0] for (_, member) in res ])
new = set(members)
mods = {
'del': set(old) - set(new),
'add': set(new) - set(old),
}
if len(mods['del']) == 0 and len(mods['add']) == 0:
return
for action in ['del', 'add']:
for userid in mods[action]:
dn = 'uid=%s,%s' % (ldapi.escape(userid), cfg['ldap_users_base'])
entry1 = {'position' : [position]}
entry2 = {} #{'position' : []}
entry = ()
if action == 'del':
entry = (entry1, entry2)
elif action == 'add':
entry = (entry2, entry1)
mlist = ldapi.make_modlist(entry[0], entry[1])
ld.modify_s(dn, mlist)
def change_group_member(action, group, userid):
user_dn = 'uid=%s,%s' % (ldapi.escape(userid), cfg['ldap_users_base'])
group_dn = 'cn=%s,%s' % (ldapi.escape(group), cfg['ldap_groups_base'])
entry1 = {'uniqueMember' : []}
entry2 = {'uniqueMember' : [user_dn]}
entry = []
if action == 'add' or action == 'insert':
entry = (entry1, entry2)
elif action == 'remove' or action == 'delete':
entry = (entry2, entry1)
else:
raise InvalidArgument("action", action, "invalid action")
mlist = ldapi.make_modlist(entry[0], entry[1])
ld.modify_s(group_dn, mlist)
### Shells ###
def get_shell(userid):
member = ldapi.lookup(ld, 'uid', userid, cfg['ldap_users_base'])
if not member:
raise NoSuchMember(userid)
if 'loginShell' not in member:
return
return member['loginShell'][0]
def get_shells():
return [ sh for sh in open(cfg['shells_file']).read().split("\n")
if sh
and sh[0] == '/'
and not '#' in sh
and os.access(sh, os.X_OK) ]
def set_shell(userid, shell):
if not shell in get_shells():
raise InvalidArgument("shell", shell, "is not in %s" % cfg['shells_file'])
ldapi.modify(ld, 'uid', userid, cfg['ldap_users_base'], [ (ldap.MOD_REPLACE, 'loginShell', [ shell ]) ])
### Clubs ###
def create_club(username, name):
"""
Creates a UNIX user account with options tailored to CSC-hosted clubs.
Parameters:
username - the desired UNIX username
name - the club name
Exceptions:
InvalidArgument - on bad account attributes provided
Returns: the uid number of the new account
See: create()
"""
# check username format
if not username or not re.match(cfg['username_regex'], username):
raise InvalidArgument("username", username, "expected format %s" % repr(cfg['username_regex']))
try:
request = ceo_pb2.AddUser()
request.type = ceo_pb2.AddUser.CLUB
request.username = username
request.realname = name
out = remote.run_remote('adduser', request.SerializeToString())
response = ceo_pb2.AddUserResponse()
response.ParseFromString(out)
if any(message.status != 0 for message in response.messages):
raise MemberException('\n'.join(message.message for message in response.messages))
except remote.RemoteException, e:
raise MemberException(e)
except OSError, e:
raise MemberException(e)
### Terms ###
def register(userid, term_list):
"""
Registers a member for one or more terms.
Parameters:
userid - the member's username
term_list - the term to register for, or a list of terms
Exceptions:
InvalidTerm - if a term is malformed
Example: register(3349, "w2007")
Example: register(3349, ["w2007", "s2007"])
"""
user_dn = 'uid=%s,%s' % (ldapi.escape(userid), cfg['ldap_users_base'])
if type(term_list) in (str, unicode):
term_list = [ term_list ]
ldap_member = get(userid)
if ldap_member and 'term' not in ldap_member:
ldap_member['term'] = []
if not ldap_member:
raise NoSuchMember(userid)
new_member = ldap_member.copy()
new_member['term'] = new_member['term'][:]
for term in term_list:
# check term syntax
if not re.match('^[wsf][0-9]{4}$', term):
raise InvalidTerm(term)
# add the term to the entry
if not term in ldap_member['term']:
new_member['term'].append(term)
mlist = ldapi.make_modlist(ldap_member, new_member)
ld.modify_s(user_dn, mlist)
def register_nonmember(userid, term_list):
"""Registers a non-member for one or more terms."""
user_dn = 'uid=%s,%s' % (ldapi.escape(userid), cfg['ldap_users_base'])
if type(term_list) in (str, unicode):
term_list = [ term_list ]
ldap_member = get(userid)
if not ldap_member:
raise NoSuchMember(userid)
if 'term' not in ldap_member:
ldap_member['term'] = []
if 'nonMemberTerm' not in ldap_member:
ldap_member['nonMemberTerm'] = []
new_member = ldap_member.copy()
new_member['nonMemberTerm'] = new_member['nonMemberTerm'][:]
for term in term_list:
# check term syntax
if not re.match('^[wsf][0-9]{4}$', term):
raise InvalidTerm(term)
# add the term to the entry
if not term in ldap_member['nonMemberTerm'] \
and not term in ldap_member['term']:
new_member['nonMemberTerm'].append(term)
mlist = ldapi.make_modlist(ldap_member, new_member)
ld.modify_s(user_dn, mlist)
def registered(userid, term):
"""
Determines whether a member is registered
for a term.
Parameters:
userid - the member's username
term - the term to check
Returns: whether the member is registered
Example: registered("mspang", "f2006") -> True
"""
member = get(userid)
if not member is None:
return 'term' in member and term in member['term']
else:
return False
def group_members(group):
"""
Returns a list of group members
"""
group = ldapi.lookup(ld, 'cn', group, cfg['ldap_groups_base'])
if group and 'uniqueMember' in group:
r = re.compile('^uid=([^,]*)')
return map(lambda x: r.match(x).group(1), group['uniqueMember'])
return []
def expired_accounts():
members = ldapi.search(ld, cfg['ldap_users_base'],
'(&(objectClass=member)(!(|(term=%s)(nonMemberTerm=%s))))' %
(terms.current(), terms.current()))
return dict([(member[0], member[1]) for member in members])
def send_account_expired_email(name, email):
args = [ cfg['expire_hook'], name, email ]
os.spawnv(os.P_WAIT, cfg['expire_hook'], args)
def subscribe_to_mailing_list(name):
member = get(name)
if member is not None:
return remote.run_remote('mailman', name)
else:
return 'Error: member does not exist'

View File

@ -1,54 +0,0 @@
import os, re, subprocess, ldap, socket, pwd
from ceo import conf, ldapi, terms, remote, ceo_pb2
from ceo.excep import InvalidArgument
class MySQLException(Exception):
pass
def write_mysql_info(username, password):
homedir = pwd.getpwnam(username).pw_dir
password_file = '%s/ceo-mysql-info' % homedir
if os.path.exists(password_file):
os.rename(password_file, password_file + '.old')
fd = os.open(password_file, os.O_CREAT|os.O_EXCL|os.O_WRONLY, 0660)
fh = os.fdopen(fd, 'w')
fh.write("""MySQL Database Information for %(username)s
Your new MySQL database was created. To connect, use
the following options:
Database: %(username)s
Username: %(username)s
Password: %(password)s
Hostname: localhost
The command to connect using the MySQL command-line client is
mysql %(username)s -u %(username)s -p
If you prefer a GUI you can use phpmyadmin at
http://csclub.uwaterloo.ca/phpmyadmin
This database is only accessible from caffeine.
""" % { 'username': username, 'password': password })
fh.close()
def create_mysql(username):
try:
request = ceo_pb2.AddMySQLUser()
request.username = username
out = remote.run_remote('mysql', request.SerializeToString())
response = ceo_pb2.AddMySQLUserResponse()
response.ParseFromString(out)
if any(message.status != 0 for message in response.messages):
raise MySQLException('\n'.join(message.message for message in response.messages))
return response.password
except remote.RemoteException, e:
raise MySQLException(e)

View File

@ -1,24 +0,0 @@
import os, syslog, grp
def response_message(response, status, message):
if status:
priority = syslog.LOG_ERR
else:
priority = syslog.LOG_INFO
syslog.syslog(priority, message)
msg = response.messages.add()
msg.status = status
msg.message = message
return status
def get_ceo_user():
user = os.environ.get('CEO_USER')
if not user:
raise Exception("environment variable CEO_USER not set");
return user
def check_group(user, group):
try:
return user in grp.getgrnam(group).gr_mem
except KeyError:
return False

View File

@ -1,155 +0,0 @@
#!/usr/bin/python
from xml.dom import minidom, Node
import urllib
import time
import datetime
import hashlib
import base64
import hmac
class PyMazonError(Exception):
"""Holds information about an error that occured during a pymazon request"""
def __init__(self, messages):
self.__message = '\n'.join(messages)
def __get_message(self):
return self.__message
def __str__(self):
return repr(self.__message)
message = property(fget=__get_message)
class PyMazonBook:
"""Stores information about a book retrieved via PyMazon."""
def __init__(self, title, authors, publisher, year, isbn10, isbn13, edition):
self.__title = title
self.__authors = authors
self.__publisher = publisher
self.__year = year
self.__isbn10 = isbn10
self.__isbn13 = isbn13
self.__edition = edition
def __str__(self):
return 'Title: ' + self.title + '\n' + \
'Author(s): ' + ', '.join(self.authors) + '\n' \
'Publisher: ' + self.publisher + '\n' + \
'Year: ' + self.year + '\n' + \
'ISBN-10: ' + self.isbn10 + '\n' + \
'ISBN-13: ' + self.isbn13 + '\n' + \
'Edition: ' + self.edition
def __get_title(self):
return self.__title
def __get_authors(self):
return self.__authors
def __get_publisher(self):
return self.__publisher
def __get_year(self):
return self.__year
def __get_isbn10(self):
return self.__isbn10
def __get_isbn13(self):
return self.__isbn13
def __get_edition(self):
return self.__edition
title = property(fget=__get_title)
authors = property(fget=__get_authors)
publisher = property(fget=__get_publisher)
year = property(fget=__get_year)
isbn10 = property(fget=__get_isbn10)
isbn13 = property(fget=__get_isbn13)
edition = property(fget=__get_edition)
class PyMazon:
"""A method of looking up book information on Amazon."""
def __init__(self, accesskey, secretkey):
self.__key = accesskey
self.__secret = secretkey
self.__last_query_time = 0
def __form_request(self, isbn):
content = {}
dstamp = datetime.datetime.utcfromtimestamp(time.time())
content['Timestamp'] = dstamp.strftime('%Y-%m-%dT%H:%M:%S.000Z')
content['Service'] = 'AWSECommerceService'
content['Version'] = '2008-08-19'
content['Operation'] = 'ItemLookup'
content['ResponseGroup'] = 'ItemAttributes'
content['IdType'] = 'ISBN'
content['SearchIndex'] = 'Books'
content['ItemId'] = isbn
content['AWSAccessKeyId'] = self.__key
URI_String = []
for key, value in sorted(content.items()):
URI_String.append('%s=%s' % (key, urllib.quote(value)))
req = '&'.join(URI_String)
to_sign_req = 'GET\necs.amazonaws.com\n/onca/xml\n' + req
h = hmac.new(self.__secret, to_sign_req, hashlib.sha256)
sig = base64.b64encode(h.digest())
req += '&Signature=%s' % urllib.quote(sig)
return 'http://ecs.amazonaws.com/onca/xml?' + req
def __elements_text(self, element, name):
result = []
matching = element.getElementsByTagName(name)
for match in matching:
if len(match.childNodes) != 1:
continue
child = match.firstChild
if child.nodeType != Node.TEXT_NODE:
continue
result.append(child.nodeValue.strip())
return result
def __format_errors(self, errors):
error_list = []
for error in errors:
error_list.extend(self.__elements_text(error, 'Message'))
return error_list
def __extract_single(self, element, name):
matches = self.__elements_text(element, name)
if len(matches) == 0:
return ''
return matches[0]
def lookup(self, isbn):
file = urllib.urlretrieve(self.__form_request(isbn))[0]
xmldoc = minidom.parse(file)
cur_time = time.time()
while cur_time - self.__last_query_time < 1.0:
sleep(cur_time - self.__last_query_time)
cur_time = time.time()
self.__last_query_time = cur_time
errors = xmldoc.getElementsByTagName('Errors')
if len(errors) != 0:
raise PyMazonError, self.__format_errors(errors)
title = self.__extract_single(xmldoc, 'Title')
authors = self.__elements_text(xmldoc, 'Author')
publisher = self.__extract_single(xmldoc, 'Publisher')
year = self.__extract_single(xmldoc, 'PublicationDate')[0:4]
isbn10 = self.__extract_single(xmldoc, 'ISBN')
isbn13 = self.__extract_single(xmldoc, 'EAN')
edition = self.__extract_single(xmldoc, 'Edition')
return PyMazonBook(title, authors, publisher, year, isbn10, isbn13, edition)

View File

@ -1,18 +0,0 @@
import os
import subprocess
class RemoteException(Exception):
"""Exception class for bad argument values."""
def __init__(self, status, stdout, stderr):
self.status, self.stdout, self.stderr = status, stdout, stderr
def __str__(self):
return 'Error executing ceoc (%d)\n\n%s' % (self.status, self.stderr)
def run_remote(op, data):
ceoc = '%s/ceoc' % os.environ.get('CEO_LIB_DIR', '/usr/lib/ceod')
addmember = subprocess.Popen([ceoc, op], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = addmember.communicate(data)
status = addmember.wait()
if status:
raise RemoteException(status, out, err)
return out

View File

@ -1,254 +0,0 @@
"""
Terms Routines
This module contains functions for manipulating terms, such as determining
the current term, finding the next or previous term, converting dates to
terms, and more.
"""
import time, datetime, re
# year to count terms from
EPOCH = 1970
# seasons list
SEASONS = [ 'w', 's', 'f' ]
def validate(term):
"""
Determines whether a term is well-formed.
Parameters:
term - the term string
Returns: whether the term is valid (boolean)
Example: validate("f2006") -> True
"""
regex = '^[wsf][0-9]{4}$'
return re.match(regex, term) is not None
def parse(term):
"""Helper function to convert a term string to the number of terms
since the epoch. Such numbers are intended for internal use only."""
if not validate(term):
raise Exception("malformed term: %s" % term)
year = int( term[1:] )
season = SEASONS.index( term[0] )
return (year - EPOCH) * len(SEASONS) + season
def generate(term):
"""Helper function to convert a year and season to a term string."""
year = int(term / len(SEASONS)) + EPOCH
season = term % len(SEASONS)
return "%s%04d" % ( SEASONS[season], year )
def next(term):
"""
Returns the next term. (convenience function)
Parameters:
term - the term string
Retuns: the term string of the following term
Example: next("f2006") -> "w2007"
"""
return add(term, 1)
def previous(term):
"""
Returns the previous term. (convenience function)
Parameters:
term - the term string
Returns: the term string of the preceding term
Example: previous("f2006") -> "s2006"
"""
return add(term, -1)
def add(term, offset):
"""
Calculates a term relative to some base term.
Parameters:
term - the base term
offset - the number of terms since term (may be negative)
Returns: the term that comes offset terms after term
"""
return generate(parse(term) + offset)
def delta(initial, final):
"""
Calculates the distance between two terms.
It should be true that add(a, delta(a, b)) == b.
Parameters:
initial - the base term
final - the term at some offset from the base term
Returns: the offset of final relative to initial
"""
return parse(final) - parse(initial)
def compare(first, second):
"""
Compares two terms. This function is suitable
for use with list.sort().
Parameters:
first - base term for comparison
second - term to compare to
Returns: > 0 (if first > second)
= 0 (if first == second)
< 0 (if first < second)
"""
return delta(second, first)
def interval(base, count):
"""
Returns a list of adjacent terms.
Parameters:
base - the first term in the interval
count - the number of terms to include
Returns: a list of count terms starting with initial
Example: interval('f2006', 3) -> [ 'f2006', 'w2007', 's2007' ]
"""
terms = []
for num in xrange(count):
terms.append( add(base, num) )
return terms
def tstamp(timestamp):
"""Helper to convert seconds since the epoch
to terms since the epoch."""
# let python determine the month and year
date = datetime.date.fromtimestamp(timestamp)
# determine season
if date.month <= 4:
season = SEASONS.index('w')
elif date.month <= 8:
season = SEASONS.index('s')
else:
season = SEASONS.index('f')
return (date.year - EPOCH) * len(SEASONS) + season
def from_timestamp(timestamp):
"""
Converts a number of seconds since
the epoch to a number of terms since
the epoch.
This function notes that:
WINTER = JANUARY to APRIL
SPRING = MAY to AUGUST
FALL = SEPTEMBER to DECEMBER
Parameters:
timestamp - number of seconds since the epoch
Returns: the number of terms since the epoch
Example: from_timestamp(1166135779) -> 'f2006'
"""
return generate( tstamp(timestamp) )
def curr():
"""Helper to determine the current term."""
return tstamp( time.time() )
def current():
"""
Determines the current term.
Returns: current term
Example: current() -> 'f2006'
"""
return generate( curr() )
def next_unregistered(registered):
"""
Find the first future or current unregistered term.
Intended as the 'default' for registrations.
Parameters:
registered - a list of terms a member is registered for
Returns: the next unregistered term
"""
# get current term number
now = curr()
# never registered -> current term is next
if len( registered) < 1:
return generate( now )
# return the first unregistered, or the current term (whichever is greater)
return generate(max([max(map(parse, registered))+1, now]))
### Tests ###
if __name__ == '__main__':
from ceo.test import test, assert_equal, success
test(parse); assert_equal(110, parse('f2006')); success()
test(generate); assert_equal('f2006', generate(110)); success()
test(next); assert_equal('w2007', next('f2006')); success()
test(previous); assert_equal('s2006', previous('f2006')); success()
test(delta); assert_equal(1, delta('f2006', 'w2007')); success()
test(compare); assert_equal(-1, compare('f2006', 'w2007')); success()
test(add); assert_equal('w2010', add('f2006', delta('f2006', 'w2010'))); success()
test(interval); assert_equal(['f2006', 'w2007', 's2007'], interval('f2006', 3)); success()
test(from_timestamp); assert_equal('f2006', from_timestamp(1166135779)); success()
test(current); assert_equal(True, parse( current() ) >= 110 ); success()
test(next_unregistered)
assert_equal( next(current()), next_unregistered([ current() ]))
assert_equal( current(), next_unregistered([]))
assert_equal( current(), next_unregistered([ previous(current()) ]))
assert_equal( current(), next_unregistered([ add(current(), -2) ]))
success()

View File

@ -1,42 +0,0 @@
"""
Common Test Routines
This module contains helpful functions called by each module's test suite.
"""
from types import FunctionType, MethodType, ClassType, TypeType
class TestException(Exception):
"""Exception class for test failures."""
def test(subject):
"""Print a test message."""
if type(subject) in (MethodType, FunctionType, ClassType, TypeType):
print "testing %s()..." % subject.__name__,
else:
print "testing %s..." % subject,
def success():
"""Print a success message."""
print "pass."
def assert_equal(expected, actual):
if expected != actual:
message = "Expected (%s)\nWas (%s)" % (repr(expected), repr(actual))
fail(message)
def fail(message):
print "failed!"
raise TestException("Test failed:\n%s" % message)
def negative(call, args, excep, message):
try:
call(*args)
fail(message)
except excep:
pass

View File

@ -1 +0,0 @@
"""Urwid User Interface"""

View File

@ -1,84 +0,0 @@
import urwid
from ceo import members, mysql
from ceo.urwid import search
from ceo.urwid.widgets import *
from ceo.urwid.window import *
class IntroPage(WizardPanel):
def init_widgets(self):
self.widgets = [
urwid.Text("MySQL databases"),
urwid.Divider(),
urwid.Text("Members and hosted clubs may have one MySQL database each. You may "
"create a database for an account if: \n"
"\n"
"- It is your personal account,\n"
"- It is a club account, and you are in the club group, or\n"
"- You are on the CSC systems committee\n"
"\n"
"You may also use this to reset your database password."
)
]
def focusable(self):
return False
class UserPage(WizardPanel):
def init_widgets(self):
self.userid = LdapWordEdit(csclub_uri, csclub_base, 'uid',
"Username: ")
self.widgets = [
urwid.Text("Member Information"),
urwid.Divider(),
urwid.Text("Enter the user which will own the new database."),
urwid.Divider(),
self.userid,
]
def check(self):
self.state['userid'] = self.userid.get_edit_text()
self.state['member'] = None
if self.state['userid']:
self.state['member'] = members.get(self.userid.get_edit_text())
if not self.state['member']:
set_status("Member not found")
self.focus_widget(self.userid)
return True
class EndPage(WizardPanel):
def init_widgets(self):
self.headtext = urwid.Text("")
self.midtext = urwid.Text("")
self.widgets = [
self.headtext,
urwid.Divider(),
self.midtext,
]
def focusable(self):
return False
def activate(self):
problem = None
try:
password = mysql.create_mysql(self.state['userid'])
try:
mysql.write_mysql_info(self.state['userid'], password)
helpfiletext = "Settings written to ~%s/ceo-mysql-info." % self.state['userid']
except (KeyError, IOError, OSError), e:
helpfiletext = "An error occured writing the settings file: %s" % e
self.headtext.set_text("MySQL database created")
self.midtext.set_text("Connection Information: \n"
"\n"
"Database: %s\n"
"Username: %s\n"
"Hostname: localhost\n"
"Password: %s\n"
"\n"
"%s\n"
% (self.state['userid'], self.state['userid'], password, helpfiletext))
except mysql.MySQLException, e:
self.headtext.set_text("Failed to create MySQL database")
self.midtext.set_text("We failed to create the database. The error was:\n\n%s" % e)
def check(self):
pop_window()

View File

@ -1,135 +0,0 @@
import urwid
from ceo import members
from ceo.urwid import search
from ceo.urwid.widgets import *
from ceo.urwid.window import *
def change_group_member(data):
push_wizard("%s %s Member" % (data["action"], data["name"]), [
(ChangeMember, data),
EndPage,
])
def list_group_members(data):
mlist = members.list_group( data["group"] ).values()
search.member_list( mlist )
def group_members(data):
add_data = data.copy()
add_data['action'] = 'Add'
remove_data = data.copy()
remove_data['action'] = 'Remove'
menu = make_menu([
("Add %s member" % data["name"].lower(),
change_group_member, add_data),
("Remove %s member" % data["name"].lower(),
change_group_member, remove_data),
("List %s members" % data["name"].lower(), list_group_members, data),
("Back", raise_back, None),
])
push_window(menu, "Manage %s" % data["name"])
class IntroPage(WizardPanel):
def init_widgets(self):
self.widgets = [
urwid.Text( "Managing Club or Group" ),
urwid.Divider(),
urwid.Text( "Adding a member to a club will also grant them "
"access to the club's files and allow them to "
"become_club."
"\n\n")
]
def focusable(self):
return False
class InfoPage(WizardPanel):
def init_widgets(self):
self.group = LdapWordEdit(csclub_uri, csclub_base, 'uid',
"Club or Group: ")
self.widgets = [
urwid.Text( "Club or Group Information"),
urwid.Divider(),
self.group,
]
def check(self):
group = self.group.get_edit_text()
# check if group is valid
if not group or not members.get_group(group):
set_status("Group not found")
self.focus_widget(self.group)
return True
data = {
"name" : group,
"group" : group,
"groups" : [],
}
# Office Staff and Syscom get added to more groups
if group == "syscom":
data["name"] = "Systems Committee"
data["groups"] = [ "office", "staff", "adm", "src" ]
elif group == "office":
data["name"] = "Office Staff"
data["groups"] = [ "cdrom", "audio", "video", "www" ]
group_members(data)
class ChangeMember(WizardPanel):
def __init__(self, state, data):
state['data'] = data
WizardPanel.__init__(self, state)
def init_widgets(self):
self.userid = LdapWordEdit(csclub_uri, csclub_base, 'uid',
"Username: ")
data = self.state['data']
self.widgets = [
urwid.Text( "%s %s Member" % (data['action'], data['name']) ),
urwid.Divider(),
self.userid,
]
def check(self):
self.state['userid'] = self.userid.get_edit_text()
if self.state['userid']:
self.state['member'] = members.get(self.userid.get_edit_text())
if not self.state['member']:
set_status("Member not found")
self.focus_widget(self.userid)
return True
clear_status()
class EndPage(WizardPanel):
def init_widgets(self):
self.headtext = urwid.Text("")
self.midtext = urwid.Text("")
self.widgets = [
self.headtext,
urwid.Divider(),
self.midtext,
]
def focusable(self):
return False
def check(self):
pop_window()
def activate(self):
data = self.state['data']
action = data['action'].lower()
failed = []
for group in data['groups'] + [data['group']]:
try:
members.change_group_member(action, group, self.state['userid'])
except ldap.LDAPError:
failed.append(group)
if len(failed) == 0:
self.headtext.set_text("%s succeeded" % data['action'])
self.midtext.set_text("Congratulations, the group modification "
"has succeeded.")
else:
self.headtext.set_text("%s Results" % data['action'])
self.midtext.set_text("Failed to %s member to %s for the "
"following groups: %s. This may indicate an attempt to add a "
"duplicate group member or to delete a member that was not in "
"the group." % (data['action'].lower(), data['name'],
', '.join(failed)))

View File

@ -1,47 +0,0 @@
import urwid
from ceo.urwid.widgets import *
from ceo.urwid.window import *
from ceo import terms
class InfoPage(WizardPanel):
def init_widgets(self):
self.userid = urwid.Text("")
self.name = urwid.Text("")
self.terms = urwid.Text("")
self.nmterms = urwid.Text("")
self.program = urwid.Text("")
self.widgets = [
urwid.Text( "Member Details" ),
urwid.Divider(),
self.name,
self.userid,
self.program,
urwid.Divider(),
self.terms,
self.nmterms,
]
def focusable(self):
return False
def activate(self):
member = self.state.get('member', {})
name = member.get('cn', [''])[0]
userid = self.state['userid']
program = member.get('program', [''])[0]
shell = member.get('loginShell', [''])[0]
mterms = member.get('term', [])
nmterms = member.get('nonMemberTerm', [])
mterms.sort(terms.compare)
nmterms.sort(terms.compare)
self.name.set_text("Name: %s" % name)
self.userid.set_text("User: %s" % userid)
self.program.set_text("Program: %s" % program)
self.program.set_text("Shell: %s" % shell)
if terms:
self.terms.set_text("Terms: %s" % ", ".join(mterms))
if nmterms:
self.nmterms.set_text("Rep Terms: %s" % ", ".join(nmterms))
def check(self):
pop_window()

View File

@ -1,8 +0,0 @@
import os
from ceo.urwid.window import *
def library(data):
os.system("librarian")
ui.stop()
ui.start()

View File

@ -1,192 +0,0 @@
import os, grp, pwd, sys, random, urwid.curses_display
from ceo.urwid.widgets import *
from ceo.urwid.window import *
from ceo.urwid import newmember, renew, info, search, positions, groups, \
shell, library, databases
def program_name():
cwords = [ "CSC" ] * 20 + [ "Club" ] * 10 + [ "Campus" ] * 5 + \
[ "Communist", "Canadian", "Celestial", "Cryptographic", "Calum's",
"Canonical", "Capitalist", "Catastrophic", "Ceremonial", "Chaotic", "Civic",
"City", "County", "Caffeinated" ]
ewords = [ "Embellished", "Ergonomic", "Electric", "Eccentric", "European", "Economic",
"Evil", "Egotistical", "Elliptic", "Emasculating", "Embalming",
"Embryonic", "Emigrant", "Emissary's", "Emoting", "Employment", "Emulated",
"Enabling", "Enamoring", "Encapsulated", "Enchanted", "Encoded", "Encrypted",
"Encumbered", "Endemic", "Enhanced", "Enigmatic", "Enlightened", "Enormous",
"Enrollment", "Enshrouded", "Ephemeral", "Epidemic", "Episodic", "Epsilon",
"Equitable", "Equestrian", "Equilateral", "Erroneous", "Erratic",
"Espresso", "Essential", "Estate", "Esteemed", "Eternal", "Ethical", "Eucalyptus",
"Euphemistic", "Evangelist", "Evasive", "Everyday", "Evidence", "Eviction", "Evildoer's",
"Evolution", "Exacerbation", "Exalted", "Examiner's", "Excise", "Exciting", "Exclusion",
"Exec", "Executioner's", "Exile", "Existential", "Expedient", "Expert", "Expletive",
"Exploiter's", "Explosive", "Exponential", "Exposing", "Extortion", "Extraction",
"Extraneous", "Extravaganza", "Extreme", "Extraterrestrial", "Extremist", "Eerie" ]
owords = [ "Office" ] * 50 + [ "Outhouse", "Outpost" ]
cword = random.choice(cwords)
eword = random.choice(ewords)
oword = random.choice(owords)
return "%s %s %s" % (cword, eword, oword)
def new_member(*args, **kwargs):
push_wizard("New Member", [
newmember.IntroPage,
newmember.InfoPage,
newmember.NumberOfTermsPage,
newmember.SignPage,
newmember.PassPage,
newmember.EndPage,
], (60, 15))
def new_club(*args, **kwargs):
push_wizard("New Club Account", [
newmember.ClubIntroPage,
newmember.ClubInfoPage,
(newmember.EndPage, "club"),
], (60, 15))
def new_club_user(*args, **kwargs):
push_wizard("New Club Rep Account", [
newmember.ClubUserIntroPage,
newmember.ClubNoPayPage,
newmember.InfoPage,
newmember.NumberOfTermsPage,
newmember.SignPage,
newmember.PassPage,
(newmember.EndPage, "clubuser"),
], (60, 15))
def manage_group(*args, **kwargs):
push_wizard("Manage Club or Group Members", [
groups.IntroPage,
groups.InfoPage,
], (60, 15))
def renew_member(*args, **kwargs):
push_wizard("Renew Membership", [
renew.IntroPage,
renew.UserPage,
renew.EmailPage,
renew.EmailDonePage,
renew.TermPage,
renew.PayPage,
renew.EndPage,
], (60, 15))
def renew_club_user(*args, **kwargs):
push_wizard("Renew Club Rep Account", [
renew.ClubUserIntroPage,
newmember.ClubNoPayPage,
renew.UserPage,
renew.EmailPage,
renew.EmailDonePage,
(renew.TermPage, "clubuser"),
(renew.EndPage, "clubuser"),
], (60, 15))
def display_member(data):
push_wizard("Display Member", [
renew.UserPage,
info.InfoPage,
], (60, 15))
def search_members(data):
menu = make_menu([
("Members by term", search_term, None),
("Members by name", search_name, None),
("Members by group", search_group, None),
("Back", raise_back, None),
])
push_window(menu, "Search Members")
def search_name(data):
push_wizard("By Name", [ search.NamePage ])
def search_term(data):
push_wizard("By Term", [ search.TermPage ])
def search_group(data):
push_wizard("By Group", [ search.GroupPage ])
def manage_positions(data):
push_wizard("Manage Positions", [
positions.IntroPage,
positions.InfoPage,
positions.EndPage,
], (50, 15))
def change_shell(data):
push_wizard("Change Shell", [
shell.IntroPage,
shell.YouPage,
shell.ShellPage,
shell.EndPage
], (50, 20))
def create_mysql_db(data):
push_wizard("Create MySQL database", [
databases.IntroPage,
databases.UserPage,
databases.EndPage,
], (60, 15))
def check_group(group):
try:
me = pwd.getpwuid(os.getuid()).pw_name
return me in grp.getgrnam(group).gr_mem
except KeyError:
pass
def top_menu():
office_only = [
("New Member", new_member, None),
("New Club Rep", new_club_user, None),
("Renew Membership", renew_member, None),
("Renew Club Rep", renew_club_user, None),
("New Club", new_club, None),
("Library", library.library, None),
]
syscom_only = [
("Manage Club or Group Members", manage_group, None),
("Manage Positions", manage_positions, None),
]
unrestricted = [
("Display Member", display_member, None),
("Search Members", search_members, None),
("Change Shell", change_shell, None),
("Create MySQL database", create_mysql_db, None),
]
footer = [
("Exit", raise_abort, None),
]
menu = None
# reorder the menu for convenience
if not check_group('office') and not check_group('syscom'):
menu = labelled_menu([
('Unrestricted', unrestricted),
('Office Staff', office_only),
('Systems Committee', syscom_only),
(None, footer)
])
else:
menu = labelled_menu([
('Office Staff', office_only),
('Unrestricted', unrestricted),
('Systems Committee', syscom_only),
(None, footer)
])
return menu
def run():
push_window(top_menu(), program_name())
event_loop(ui)
def start():
ui.run_wrapper( run )
if __name__ == '__main__':
start()

View File

@ -1,267 +0,0 @@
import ldap, urwid #, re
from ceo import members, terms, remote, uwldap
from ceo.urwid.widgets import *
from ceo.urwid.window import *
class IntroPage(WizardPanel):
def init_widgets(self):
self.widgets = [
urwid.Text( "Joining the Computer Science Club" ),
urwid.Divider(),
urwid.Text( "CSC membership is $2.00 per term. Please ensure "
"the fee is deposited into the cup before continuing." ),
]
def focusable(self):
return False
class ClubIntroPage(WizardPanel):
def init_widgets(self):
self.widgets = [
urwid.Text( "Club Accounts" ),
urwid.Divider(),
urwid.Text( "We provide other UW clubs accounts for email and "
"web hosting, free of charge. Like members, clubs "
"get web hosting at %s. We can also arrange for "
"uwaterloo.ca subdomains; please instruct the club "
"representative to contact the systems committee "
"for more information. Club accounts do not have "
"passwords, and exist primarily to own club data. "
% "http://csclub.uwaterloo.ca/~clubid/" ),
]
def focusable(self):
return False
class ClubNoPayPage(WizardPanel):
def init_widgets(self):
self.widgets = [
urwid.Text( "Club representative accounts are free. Please ensure "
"that no money was paid for this account." ),
]
def focusable(self):
return False
class ClubUserIntroPage(WizardPanel):
def init_widgets(self):
self.widgets = [
urwid.Text( "Club Rep Account" ),
urwid.Divider(),
urwid.Text( "This is for people who need access to a club account, "
"but are not currently interested in full CSC membership. "
"Registering a user in this way grants one term of free "
"access to our machines, without any other membership "
"privileges (they can't vote, hold office, etc). If such "
"a user later decides to join, use the Renew Membership "
"option." ),
]
def focusable(self):
return False
class InfoPage(WizardPanel):
def init_widgets(self):
self.name = SingleEdit("Full name: ")
self.program = SingleEdit("Program of Study: ")
self.email = SingleEdit("Email: ")
self.userid = LdapFilterWordEdit(uwldap.uri(), uwldap.base(), 'uid',
{'cn':self.name, 'ou':self.program}, "Username: ")
self.widgets = [
urwid.Text( "Member Information" ),
urwid.Divider(),
self.userid,
self.name,
self.program,
self.email,
urwid.Divider(),
urwid.Text("Notes:"),
urwid.Text("- Make sure to check ID (watcard, drivers license)"),
urwid.Text("- Make sure to use UW userids for current students\n (we check uwldap to see if you are a full member)"),
]
def check(self):
self.state['userid'] = self.userid.get_edit_text()
self.state['name'] = self.name.get_edit_text()
self.state['program'] = self.program.get_edit_text()
self.state['email'] = self.email.get_edit_text()
if len( self.state['userid'] ) < 2:
self.focus_widget( self.userid )
set_status("Username is too short")
return True
elif len( self.state['name'] ) < 4:
self.focus_widget( self.name )
set_status("Name is too short")
return True
elif self.state['userid'] == self.state['name']:
self.focus_widget(self.name)
set_status("Name matches username")
return True
clear_status()
class ClubInfoPage(WizardPanel):
def init_widgets(self):
self.userid = WordEdit("Username: ")
self.name = SingleEdit("Club Name: ")
self.widgets = [
urwid.Text( "Club Information" ),
urwid.Divider(),
self.userid,
self.name,
]
def check(self):
self.state['userid'] = self.userid.get_edit_text()
self.state['name'] = self.name.get_edit_text()
if len( self.state['userid'] ) < 3:
self.focus_widget( self.userid )
set_status("Username is too short")
return True
elif len( self.state['name'] ) < 4:
self.focus_widget( self.name )
set_status("Name is too short")
return True
elif self.state['userid'] == self.state['name']:
self.focus_widget(self.name)
set_status("Name matches username")
return True
clear_status()
class NumberOfTermsPage(WizardPanel):
def init_widgets(self):
self.count = SingleIntEdit("Count: ")
self.widgets = [
urwid.Text("Number of Terms"),
urwid.Divider(),
urwid.Text("The member will be initially registered for this many "
"consecutive terms.\n"),
self.count
]
def activate(self):
self.count.set_edit_text("1")
self.focus_widget(self.count)
def check(self):
self.state['terms'] = terms.interval(terms.current(), self.count.value())
if len(self.state['terms']) == 0:
self.focus_widget(self.count)
set_status("Registering for zero terms?")
return True
clear_status()
class SignPage(WizardPanel):
def init_widgets(self):
self.widgets = [
urwid.Text( "Machine Usage Policy" ),
urwid.Divider(),
urwid.Text( "Ensure the new member has signed the "
"Machine Usage Policy. Accounts of users who have not "
"signed will be suspended if discovered." ),
]
def focusable(self):
return False
class PassPage(WizardPanel):
def init_widgets(self):
self.password = PassEdit("Password: ")
self.pwcheck = PassEdit("Re-enter: ")
self.widgets = [
urwid.Text( "Member Password" ),
urwid.Divider(),
self.password,
self.pwcheck,
]
def focus_widget(self, widget):
self.box.set_focus( self.widgets.index( widget ) )
def clear_password(self):
self.focus_widget( self.password )
self.password.set_edit_text("")
self.pwcheck.set_edit_text("")
def check(self):
self.state['password'] = self.password.get_edit_text()
pwcheck = self.pwcheck.get_edit_text()
if self.state['password'] != pwcheck:
self.clear_password()
set_status("Passwords do not match")
return True
elif len(self.state['password']) < 5:
self.clear_password()
set_status("Password is too short")
return True
clear_status()
class EndPage(WizardPanel):
def __init__(self, state, utype='member'):
self.utype = utype
WizardPanel.__init__(self, state)
def init_widgets(self):
self.headtext = urwid.Text("")
self.midtext = urwid.Text("")
self.widgets = [
self.headtext,
urwid.Divider(),
self.midtext,
]
def focusable(self):
return False
def check(self):
pop_window()
def activate(self):
self.headtext.set_text("Adding %s" % self.state['userid'])
self.midtext.set_text("Please be patient while the user is added. "
"If more than a few seconds pass, check for a "
"phase variance and try inverting the polarity.")
set_status("Contacting the gibson...")
redraw()
problem = None
try:
if self.utype == 'member':
members.create_member(
self.state['userid'],
self.state['password'],
self.state['name'],
self.state['program'],
self.state['email'])
members.register(self.state['userid'], self.state['terms'])
mailman_result = members.subscribe_to_mailing_list(self.state['userid'])
if mailman_result != 'None':
problem = mailman_result
elif self.utype == 'clubuser':
members.create_member(
self.state['userid'],
self.state['password'],
self.state['name'],
self.state['program'],
self.state['email'],
club_rep=True)
members.register_nonmember(self.state['userid'], self.state['terms'])
elif self.utype == 'club':
members.create_club(self.state['userid'], self.state['name'])
else:
raise Exception("Internal Error")
except members.InvalidArgument, e:
problem = str(e)
except ldap.LDAPError, e:
problem = str(e)
except members.MemberException, e:
problem = str(e)
except remote.RemoteException, e:
problem = str(e)
clear_status()
if problem:
self.headtext.set_text("Failures Occured Adding User")
self.midtext.set_text("The error was:\n\n%s\n\nThe account may be partially added "
"and you may or may not be able to log in. Or perhaps you are not office staff. "
"If this was not expected please contact systems committee." % problem)
return
else:
set_status("Strombola Delivers")
self.headtext.set_text("User Added")
self.midtext.set_text("Congratulations, %s has been added "
"successfully. You should also rebuild the website in "
"order to update the memberlist."
% self.state['userid'])

View File

@ -1,97 +0,0 @@
import urwid
from ceo import members
from ceo.urwid.widgets import *
from ceo.urwid.window import *
position_data = [
('president', 'President'),
('vice-president', 'Vice-president'),
('treasurer', 'Treasurer'),
('secretary', 'Secretary'),
('sysadmin', 'System Administrator'),
('cro', 'Chief Returning Officer'),
('librarian', 'Librarian'),
('imapd', 'Imapd'),
('webmaster', 'Web Master'),
('offsck', 'Office Manager'),
]
class IntroPage(WizardPanel):
def init_widgets(self):
self.widgets = [
urwid.Text( "Managing Positions" ),
urwid.Divider(),
urwid.Text( "Enter a username for each position. If a position is "
"held by multiple people, enter a comma-separated "
"list of usernames. If a position is held by nobody "
"leave the username blank." ),
]
def focusable(self):
return False
class InfoPage(WizardPanel):
def init_widgets(self):
self.widgets = [
urwid.Text( "Positions" ),
urwid.Divider(),
]
positions = members.list_positions()
self.position_widgets = {}
for (position, text) in position_data:
widget = LdapWordEdit(csclub_uri, csclub_base, 'uid',
"%s: " % text)
if position in positions:
widget.set_edit_text(','.join(positions[position].keys()))
else:
widget.set_edit_text('')
self.position_widgets[position] = widget
self.widgets.append(widget)
def parse(self, entry):
if len(entry) == 0:
return []
return entry.split(',')
def check(self):
self.state['positions'] = {}
for (position, widget) in self.position_widgets.iteritems():
self.state['positions'][position] = \
self.parse(widget.get_edit_text())
for p in self.state['positions'][position]:
if members.get(p) == None:
self.focus_widget(widget)
set_status( "Invalid username: '%s'" % p )
return True
clear_status()
class EndPage(WizardPanel):
def init_widgets(self):
old = members.list_positions()
self.headtext = urwid.Text("")
self.midtext = urwid.Text("")
self.widgets = [
self.headtext,
urwid.Divider(),
self.midtext,
]
def focusable(self):
return False
def activate(self):
failed = []
for (position, info) in self.state['positions'].iteritems():
try:
members.set_position(position, info)
except ldap.LDAPError:
failed.append(position)
if len(failed) == 0:
self.headtext.set_text("Positions Updated")
self.midtext.set_text("Congratulations, positions have been "
"updated. You should rebuild the website in order to update "
"the Positions page.")
else:
self.headtext.set_text("Positions Results")
self.midtext.set_text("Failed to update the following positions: "
"%s." % join(failed))
def check(self):
pop_window()

View File

@ -1,240 +0,0 @@
import urwid, ldap
from ceo import members, terms, ldapi
from ceo.urwid.widgets import *
from ceo.urwid.window import *
class IntroPage(WizardPanel):
def init_widgets(self):
self.widgets = [
urwid.Text( "Renewing Membership" ),
urwid.Divider(),
urwid.Text( "CSC membership is $2.00 per term. You may pre-register "
"for future terms if desired." )
]
def focusable(self):
return False
class ClubUserIntroPage(IntroPage):
def init_widgets(self):
self.widgets = [
urwid.Text( "Renewing Club User Account" ),
urwid.Divider(),
urwid.Text( "In order for clubs to maintain websites hosted by "
"the Computer Science Club, they need access to our "
"machines. We grant accounts to club users at no charge "
"in order to provide this access. Registering a user "
"in this way grants one term of free access to our "
"machines, without any other membership privileges "
"(they can't vote, hold office, etc). If such a user "
"decides to join, use the Renew Membership option." )
]
class UserPage(WizardPanel):
def init_widgets(self):
self.userid = LdapWordEdit(csclub_uri, csclub_base, 'uid',
"Username: ")
self.widgets = [
urwid.Text( "Member Information" ),
urwid.Divider(),
self.userid,
]
def check(self):
self.state['userid'] = self.userid.get_edit_text()
self.state['member'] = None
if self.state['userid']:
self.state['member'] = members.get(self.userid.get_edit_text())
if not self.state['member']:
set_status("Member not found")
self.focus_widget(self.userid)
return True
class EmailPage(WizardPanel):
def init_widgets(self):
self.email = SingleEdit("Email: ")
self.widgets = [
urwid.Text( "Mail Forwarding" ),
urwid.Divider(),
urwid.Text("Please ensure the forwarding address for your CSC "
"email is up to date. You may leave this blank if you do not "
"want your CSC email forwarded, and intend to log in "
"regularly to check it."),
urwid.Divider(),
urwid.Text("Warning: Changing this overwrites ~/.forward"),
urwid.Divider(),
self.email,
]
def activate(self):
cfwd = members.current_email(self.state['userid'])
if cfwd:
self.state['old_forward'] = cfwd
else:
self.state['old_forward'] = ''
self.email.set_edit_text(self.state['old_forward'])
def check(self):
fwd = self.email.get_edit_text().strip().lower()
if fwd:
msg = members.check_email(fwd)
if msg:
set_status(msg)
return True
if fwd == '%s@csclub.uwaterloo.ca' % self.state['userid']:
set_status('You cannot forward your address to itself. Leave it blank to disable forwarding.')
return True
self.state['new_forward'] = fwd
class EmailDonePage(WizardPanel):
def init_widgets(self):
self.status = urwid.Text("")
self.widgets = [
urwid.Text("Mail Forwarding"),
urwid.Divider(),
self.status,
]
def focusable(self):
return False
def activate(self):
if self.state['old_forward'] == self.state['new_forward']:
if self.state['old_forward']:
self.status.set_text(
'You have chosen to leave your forwarding address '
'as %s. Make sure to check this email for updates '
'from the CSC.' % self.state['old_forward'])
else:
self.status.set_text(
'You have chosen not to set a forwarding address. '
'Please check your CSC email regularly (via IMAP, POP, or locally) '
'for updates from the CSC.'
'\n\n'
'Note: If you do have a ~/.forward, we were not able to read it or '
'it was not a single email address. Do not worry, we have left it '
'as is.')
else:
try:
msg = members.change_email(self.state['userid'], self.state['new_forward'])
if msg:
self.status.set_text("Errors occured updating your forwarding address:"
"\n\n%s" % msg)
else:
if self.state['new_forward']:
self.status.set_text(
'Your email forwarding address has been successfully set '
'to %s. Test it out by emailing %s@csclub.uwaterloo.ca and '
'making sure you receive it at your forwarding address.'
% (self.state['new_forward'], self.state['userid']))
else:
self.status.set_text(
'Your email forwarding address has been successfully cleared. '
'Please check your CSC email regularly (via IMAP, POP, or locally) '
'for updates from the CSC.')
except Exception, e:
self.status.set_text(
'An exception occured updating your email:\n\n%s' % e)
class TermPage(WizardPanel):
def __init__(self, state, utype='member'):
self.utype = utype
WizardPanel.__init__(self, state)
def init_widgets(self):
self.start = SingleEdit("Start: ")
self.count = SingleIntEdit("Count: ")
self.widgets = [
urwid.Text( "Terms to Register" ),
urwid.Divider(),
self.start,
self.count,
]
def activate(self):
if not self.start.get_edit_text():
self.terms = self.state['member'].get('term', [])
self.nmterms = self.state['member'].get('nonMemberTerm', [])
if self.utype == 'member':
self.start.set_edit_text( terms.next_unregistered( self.terms ) )
else:
self.start.set_edit_text( terms.next_unregistered( self.terms + self.nmterms ) )
self.count.set_edit_text( "1" )
def check(self):
try:
self.state['terms'] = terms.interval( self.start.get_edit_text(), self.count.value() )
except Exception, e:
self.focus_widget( self.start )
set_status( "Invalid start term" )
return True
for term in self.state['terms']:
if self.utype == 'member':
already = term in self.terms
else:
already = term in self.terms or term in self.nmterms
if already:
self.focus_widget( self.start )
set_status( "Already registered for " + term )
return True
if len(self.state['terms']) == 0:
self.focus_widget(self.count)
set_status( "Registering for zero terms?" )
return True
class PayPage(WizardPanel):
def init_widgets(self):
self.midtext = urwid.Text("")
self.widgets = [
urwid.Text("Membership Fee"),
urwid.Divider(),
self.midtext,
]
def focusable(self):
return False
def activate(self):
regterms = self.state['terms']
plural = "term"
if len(self.state['terms']) > 1:
plural = "terms"
self.midtext.set_text("You are registering for %d %s, and owe the "
"Computer Science Club $%d.00 in membership fees. "
"Please deposit the money in the safe before "
"continuing. " % ( len(regterms), plural, len(regterms * 2)))
class EndPage(WizardPanel):
def __init__(self, state, utype='member'):
self.utype = utype
WizardPanel.__init__(self, state)
def init_widgets(self):
self.headtext = urwid.Text("")
self.midtext = urwid.Text("")
self.widgets = [
self.headtext,
urwid.Divider(),
self.midtext,
]
def focusable(self):
return False
def activate(self):
problem = None
try:
self.headtext.set_text("Registration Succeeded")
if self.utype == 'member':
members.register( self.state['userid'], self.state['terms'] )
self.midtext.set_text("The member has been registered for the following "
"terms: " + ", ".join(self.state['terms']) + ".")
else:
members.register_nonmember( self.state['userid'], self.state['terms'] )
self.midtext.set_text("The club user has been registered for the following "
"terms: " + ", ".join(self.state['terms']) + ".")
except ldap.LDAPError, e:
problem = ldapi.format_ldaperror(e)
except members.MemberException, e:
problem = str(e)
if problem:
self.headtext.set_text("Failed to Register")
self.midtext.set_text("You may refund any fees paid or retry. "
"The error was:\n\n%s" % problem)
def check(self):
pop_window()

View File

@ -1,83 +0,0 @@
import urwid
from ceo import members, terms
from ceo.urwid.widgets import *
from ceo.urwid.window import *
class TermPage(WizardPanel):
def init_widgets(self):
self.term = SingleEdit("Term: ")
self.widgets = [
urwid.Text( "Terms Members" ),
urwid.Divider(),
self.term,
]
def check(self):
try:
self.state['term'] = self.term.get_edit_text()
terms.parse( self.state['term'] )
except:
self.focus_widget( self.term )
set_status( "Invalid term" )
return True
mlist = members.list_term( self.state['term'] ).values()
pop_window()
member_list( mlist )
class NamePage(WizardPanel):
def init_widgets(self):
self.name = SingleEdit("Name: ")
self.widgets = [
urwid.Text( "Members by Name" ),
urwid.Divider(),
self.name,
]
def check(self):
self.state['name'] = self.name.get_edit_text()
if not self.state['name']:
self.focus_widget( self.name )
set_status( "Invalid name" )
return True
mlist = members.list_name( self.state['name'] ).values()
pop_window()
member_list( mlist )
class GroupPage(WizardPanel):
def init_widgets(self):
self.group = SingleEdit("Group: ")
self.widgets = [
urwid.Text( "Members by Group" ),
urwid.Divider(),
self.group,
]
def check(self):
self.state['group'] = self.group.get_edit_text()
if not self.state['group']:
self.focus_widget( self.group )
set_status( "Invalid group" )
return True
mlist = members.list_group( self.state['group'] ).values()
pop_window()
member_list( mlist )
def member_list(mlist):
mlist = list(mlist)
mlist.sort( lambda x, y: cmp(x['uid'], y['uid']) )
buf = ''
for member in mlist:
if 'uid' in member:
uid = member['uid'][0]
else:
uid = None
if 'program' in member:
program = member['program'][0]
else:
program = None
attrs = ( uid, member['cn'][0], program )
buf += "%10s %30s\n%41s\n\n" % attrs
set_status("Press escape to return to the menu")
push_window(urwid.ListBox([urwid.Text(buf)]))

View File

@ -1,95 +0,0 @@
import urwid, ldap, pwd, os
from ceo import members, terms, ldapi
from ceo.urwid.widgets import *
from ceo.urwid.window import *
class IntroPage(WizardPanel):
def init_widgets(self):
self.widgets = [
urwid.Text( "Changing Login Shell" ),
urwid.Divider(),
urwid.Text( "You can change your shell here. Request more shells "
"by emailing systems-committee." )
]
def focusable(self):
return False
class YouPage(WizardPanel):
def init_widgets(self):
you = pwd.getpwuid(os.getuid()).pw_name
self.userid = LdapWordEdit(csclub_uri, csclub_base, 'uid',
"Username: ", you)
self.widgets = [
urwid.Text( "Member Information" ),
urwid.Divider(),
self.userid,
]
def check(self):
self.state['userid'] = self.userid.get_edit_text()
self.state['member'] = None
if self.state['userid']:
self.state['member'] = members.get(self.userid.get_edit_text())
if not self.state['member']:
set_status("Member not found")
self.focus_widget(self.userid)
return True
class ShellPage(WizardPanel):
def init_widgets(self):
self.midtext = urwid.Text("")
self.widgets = [
urwid.Text("Choose a Shell"),
urwid.Divider(),
]
def set_shell(radio_button, new_state, shell):
if new_state:
self.state['shell'] = shell
radio_group = []
self.shells = members.get_shells()
self.shellw = [ urwid.RadioButton(radio_group, shell,
on_state_change=set_shell, user_data=shell)
for shell in self.shells ]
self.widgets.extend(self.shellw)
def set_shell(self, shell):
i = self.shells.index(shell)
self.shellw[i].set_state(True)
def focusable(self):
return True
def activate(self):
self.set_shell(self.state['member']['loginShell'][0])
class EndPage(WizardPanel):
def init_widgets(self):
self.headtext = urwid.Text("")
self.midtext = urwid.Text("")
self.widgets = [
self.headtext,
urwid.Divider(),
self.midtext,
]
def focusable(self):
return False
def activate(self):
problem = None
try:
user, shell = self.state['userid'], self.state['shell']
members.set_shell(user, shell)
self.headtext.set_text("Login Shell Changed")
self.midtext.set_text("The shell for %s has been changed to %s."
% (user, shell))
except ldap.LDAPError, e:
problem = ldapi.format_ldaperror(e)
except members.MemberException, e:
problem = str(e)
if problem:
self.headtext.set_text("Failed to Change Shell")
self.midtext.set_text("Perhaps you don't have permission to change %s's shell? "
"The error was:\n\n%s" % (user, problem))
def check(self):
pop_window()

View File

@ -1,247 +0,0 @@
import urwid, ldap, sys
from ceo.urwid.window import raise_back, push_window
import ceo.ldapi as ldapi
#Todo: kill ButtonText because no one uses it except one place and we can probably do that better anyway
csclub_uri = "ldap://ldap1.csclub.uwaterloo.ca/ ldap://ldap2.csclub.uwaterloo.ca"
csclub_base = "dc=csclub,dc=uwaterloo,dc=ca"
def make_menu(items):
items = [ urwid.AttrWrap( ButtonText( cb, data, txt ), 'menu', 'selected') for (txt, cb, data) in items ]
return ShortcutListBox(items)
def labelled_menu(itemses):
widgets = []
for label, items in itemses:
if label:
widgets.append(urwid.Text(label))
widgets += (urwid.AttrWrap(ButtonText(cb, data, txt), 'menu', 'selected') for (txt, cb, data) in items)
widgets.append(urwid.Divider())
widgets.pop()
return ShortcutListBox(widgets)
def push_wizard(name, pages, dimensions=(50, 10)):
state = {}
wiz = Wizard()
for page in pages:
if type(page) != tuple:
page = (page, )
wiz.add_panel( page[0](state, *page[1:]) )
push_window( urwid.Filler( urwid.Padding(
urwid.LineBox(wiz), 'center', dimensions[0]),
'middle', dimensions[1] ), name )
class ButtonText(urwid.Text):
def __init__(self, callback, data, *args, **kwargs):
self.callback = callback
self.data = data
urwid.Text.__init__(self, *args, **kwargs)
def selectable(self):
return True
def keypress(self, size, key):
if key == 'enter' and self.callback:
self.callback(self.data)
else:
return key
#DONTUSE
class CaptionedText(urwid.Text):
def __init__(self, caption, *args, **kwargs):
self.caption = caption
urwid.Text.__init__(self, *args, **kwargs)
def render(self, *args, **kwargs):
self.set_text(self.caption + self.get_text()[0])
urwid.Text.render(*args, **kwargs)
class SingleEdit(urwid.Edit):
def keypress(self, size, key):
key_mappings = {
'enter': 'down',
'tab': 'down',
'shift tab': 'up',
'ctrl a': 'home',
'ctrl e': 'end'
}
if key in key_mappings:
return urwid.Edit.keypress(self, size, key_mappings[key])
else:
return urwid.Edit.keypress(self, size, key)
class SingleIntEdit(urwid.IntEdit):
def keypress(self, size, key):
if key == 'enter':
return urwid.Edit.keypress(self, size, 'down')
else:
return urwid.Edit.keypress(self, size, key)
class WordEdit(SingleEdit):
def valid_char(self, ch):
return urwid.Edit.valid_char(self, ch) and ch != ' '
class LdapWordEdit(WordEdit):
ldap = None
index = None
def __init__(self, uri, base, attr, *args):
try:
self.ldap = ldap.initialize(uri)
self.ldap.simple_bind_s("", "")
except ldap.LDAPError:
return WordEdit.__init__(self, *args)
self.base = base
self.attr = ldapi.escape(attr)
return WordEdit.__init__(self, *args)
def keypress(self, size, key):
if (key == 'tab' or key == 'shift tab') and self.ldap != None:
if self.index != None:
if key == 'tab':
self.index = (self.index + 1) % len(self.choices)
elif key == 'shift tab':
self.index = (self.index - 1) % len(self.choices)
text = self.choices[self.index]
self.set_edit_text(text)
self.set_edit_pos(len(text))
else:
try:
text = self.get_edit_text()
search = ldapi.escape(text)
matches = self.ldap.search_s(self.base,
ldap.SCOPE_SUBTREE, '(%s=%s*)' % (self.attr, search))
self.choices = [ text ]
for match in matches:
(_, attrs) = match
self.choices += attrs['uid']
self.choices.sort()
self.index = 0
self.keypress(size, key)
except ldap.LDAPError, e:
pass
else:
self.index = None
return WordEdit.keypress(self, size, key)
class LdapFilterWordEdit(LdapWordEdit):
def __init__(self, uri, base, attr, map, *args):
LdapWordEdit.__init__(self, uri, base, attr, *args)
self.map = map
def keypress(self, size, key):
if self.ldap != None:
if key == 'enter' or key == 'down' or key == 'up':
search = ldapi.escape(self.get_edit_text())
try:
matches = self.ldap.search_s(self.base,
ldap.SCOPE_SUBTREE, '(%s=%s)' % (self.attr, search))
if len(matches) > 0:
(_, attrs) = matches[0]
for (k, v) in self.map.items():
if attrs.has_key(k) and len(attrs[k]) > 0:
v.set_edit_text(attrs[k][0])
except ldap.LDAPError:
pass
return LdapWordEdit.keypress(self, size, key)
class PassEdit(SingleEdit):
def get_text(self):
text = urwid.Edit.get_text(self)
return (self.caption + " " * len(self.get_edit_text()), text[1])
class EnhancedButton(urwid.Button):
def keypress(self, size, key):
if key == 'tab':
return urwid.Button.keypress(self, size, 'down')
elif key == 'shift tab':
return urwid.Button.keypress(self, size, 'up')
else:
return urwid.Button.keypress(self, size, key)
class DumbColumns(urwid.Columns):
"""Dumb columns widget
The normal one tries to focus the "nearest" widget to the cursor.
This makes the Back button default instead of the Next button.
"""
def move_cursor_to_coords(self, size, col, row):
pass
class Wizard(urwid.WidgetWrap):
def __init__(self):
self.selected = None
self.panels = []
self.panelwrap = urwid.WidgetWrap( urwid.SolidFill() )
self.back = EnhancedButton("Back", self.back)
self.next = EnhancedButton("Next", self.next)
self.buttons = DumbColumns( [ self.back, self.next ], dividechars=3, focus_column=1 )
pad = urwid.Padding( self.buttons, ('fixed right', 2), 19 )
self.pile = urwid.Pile( [self.panelwrap, ('flow', pad)], 0 )
urwid.WidgetWrap.__init__(self, self.pile)
def add_panel(self, panel):
self.panels.append( panel )
if len(self.panels) == 1:
self.select(0)
def select(self, panelno, set_focus=True):
if 0 <= panelno < len(self.panels):
self.selected = panelno
self.panelwrap._w = self.panels[panelno]
self.panelwrap._invalidate()
self.panels[panelno].activate()
if set_focus:
if self.panels[panelno].focusable():
self.pile.set_focus( 0 )
else:
self.pile.set_focus( 1 )
def next(self, *args, **kwargs):
if self.panels[self.selected].check():
self.select( self.selected )
return
self.select(self.selected + 1)
def back(self, *args, **kwargs):
if self.selected == 0:
raise_back()
self.select(self.selected - 1, False)
class WizardPanel(urwid.WidgetWrap):
def __init__(self, state):
self.state = state
self.init_widgets()
self.box = urwid.ListBox( urwid.SimpleListWalker( self.widgets ) )
urwid.WidgetWrap.__init__( self, self.box )
def init_widgets(self):
self.widgets = []
def focus_widget(self, widget):
self.box.set_focus( self.widgets.index( widget ) )
def focusable(self):
return True
def check(self):
return
def activate(self):
return
# assumes that a SimpleListWalker containing
# urwid.Text or subclass is used
class ShortcutListBox(urwid.ListBox):
def keypress(self, size, key):
# only process single letters; pass all else to super
if len(key) == 1 and key.isalpha():
next = self.get_focus()[1] + 1
shifted_contents = self.body.contents[next:] + self.body.contents[:next]
# find the next item matching the letter requested
try:
new_focus = (i for i,w in enumerate(shifted_contents)
if w.selectable() and w.text[0].upper() == key.upper()).next()
new_focus = (new_focus + next) % len(self.body.contents)
self.set_focus(new_focus)
except:
# ring the bell if it isn't found
sys.stdout.write('\a')
else:
urwid.ListBox.keypress(self, size, key)

View File

@ -1,80 +0,0 @@
import urwid
window_stack = []
window_names = []
header = urwid.Text( "" )
footer = urwid.Text( "" )
ui = urwid.curses_display.Screen()
ui.register_palette([
# name, foreground, background, mono
('banner', 'light gray', 'default', None),
('menu', 'light gray', 'default', 'bold'),
('selected', 'black', 'light gray', 'bold'),
])
top = urwid.Frame( urwid.SolidFill(), header, footer )
def push_window( frame, name=None ):
window_stack.append( frame )
window_names.append( name )
update_top()
def pop_window():
if len(window_stack) == 1:
return False
window_stack.pop()
window_names.pop()
update_top()
clear_status()
return True
def update_top():
names = [ n for n in window_names if n ]
header.set_text(" - ".join( names ) + "\n")
top.set_body( window_stack[-1] )
def set_status(message):
footer.set_text(message)
def clear_status():
footer.set_text("")
class Abort(Exception):
pass
class Back(Exception):
pass
def raise_abort(*args, **kwargs):
raise Abort()
def raise_back(*args, **kwarg):
raise Back()
def redraw():
cols, rows = ui.get_cols_rows()
canvas = top.render( (cols, rows), focus=True )
ui.draw_screen( (cols, rows), canvas )
return cols, rows
def event_loop(ui):
while True:
try:
cols, rows = redraw()
keys = ui.get_input()
for k in keys:
if k == "esc":
if not pop_window():
break
elif k == "window resize":
(cols, rows) = ui.get_cols_rows()
else:
top.keypress( (cols, rows), k )
except Back:
pop_window()
except (Abort, KeyboardInterrupt):
return

View File

@ -1,8 +0,0 @@
def uri():
return "ldap://uwldap.uwaterloo.ca/"
def base():
return "dc=uwaterloo,dc=ca"
def domain():
return 'uwaterloo.ca'

0
ceo_common/__init__.py Normal file
View File

View File

@ -0,0 +1,8 @@
from zope.interface import Interface
class IConfig(Interface):
"""Represents a config store."""
def get(key: str) -> str:
"""Get the config value for the given key."""

View File

@ -0,0 +1,39 @@
from typing import Dict, List
from zope.interface import Interface, Attribute
from .IUser import IUser
class IGroup(Interface):
"""Represents a Unix group."""
cn = Attribute('common name')
gid_number = Attribute('gid number')
unique_members = Attribute('DNs of group members')
dn = Attribute('distinguished name')
def add_to_ldap():
"""Add a new record to LDAP for this group."""
def add_member(username: str):
"""Add the member to this group in LDAP."""
def remove_member(username: str):
"""Remove the member from this group in LDAP."""
def get_members() -> List[IUser]:
"""Get a list of the members in this group."""
def serialize_for_modlist() -> Dict:
"""
Serialize this group into a dict to be passed to
ldap.modlist.addModlist().
"""
# static method
def deserialize_from_dict(data: Dict):
"""Deserialize this group from a dict returned by ldap.search_s().
:returns: IGroup
"""

View File

@ -0,0 +1,14 @@
from zope.interface import Interface
class IKerberosService(Interface):
"""A utility wrapper around kinit/kadmin."""
def kinit():
"""Acquire and cache a new TGT."""
def addprinc(principal: str, password: str):
"""Add a new principal with the specified password."""
def change_password(principal: str, password: str):
"""Set and expire the principal's password."""

View File

@ -0,0 +1,32 @@
from zope.interface import Interface
from .IUser import IUser
from .IGroup import IGroup
class ILDAPService(Interface):
"""An interface to the LDAP database."""
def get_user(username: str) -> IUser:
"""Retrieve the user with the given username."""
def save_user(user: IUser) -> IUser:
"""
Save the user in the database.
A new UID and GID will be generated and returned in the new user.
"""
def get_group(cn: str, is_club: bool = False) -> IGroup:
"""Retrieve the group with the given cn (Unix group name)."""
def save_group(group: IGroup) -> IGroup:
"""
Save the group in the database.
The GID will not be changed and must be valid.
"""
def modify_user(old_user: IUser, new_user: IUser):
"""Replace old_user with new_user."""
def modify_group(old_group: IGroup, new_group: IGroup):
"""Replace old_group with new_group."""

View File

@ -0,0 +1,65 @@
from typing import List, Dict
from zope.interface import Interface, Attribute
class IUser(Interface):
"""Represents a Unix user."""
# LDAP attributes
uid = Attribute('user identifier')
cn = Attribute('common name')
login_shell = Attribute('login shell')
uid_number = Attribute('uid number')
gid_number = Attribute('gid number')
home_directory = Attribute('home directory')
program = Attribute('academic program')
position = Attribute('executive position')
terms = Attribute('list of terms for which this person was a member')
non_member_terms = Attribute('list of terms for which this person was '
'a club rep')
mail_local_addresses = Attribute('email aliases')
dn = Attribute('distinguished name')
# Non-LDAP attributes
forwarding_addresses = Attribute('list of email forwarding addresses')
def is_club() -> bool:
"""
Returns True if this is the Unix user for a club.
Returns False if this is the Unix user for a member.
"""
def add_to_ldap():
"""Add a new record to LDAP for this user."""
def add_to_kerberos(password: str):
"""Add a new Kerberos principal for this user."""
def add_terms(terms: List[str]):
"""Add member terms for this user."""
def add_non_member_terms(terms: List[str]):
"""Add non-member terms for this user."""
def add_position(position: str):
"""Add a position to this user."""
def remove_position(position: str):
"""Remove a position from this user."""
def change_password(password: str):
"""Replace the user's password."""
def serialize_for_modlist() -> Dict:
"""
Serialize this user into a dict to be passed to
ldap.modlist.addModlist().
"""
# static method
def deserialize_from_dict(data: Dict):
"""Deserialize this user from a dict returned by ldap.search_s().
:returns: IUser
"""

View File

@ -0,0 +1,5 @@
from .IKerberosService import IKerberosService
from .IConfig import IConfig
from .IUser import IUser
from .ILDAPService import ILDAPService
from .IGroup import IGroup

View File

@ -0,0 +1,24 @@
from zope.interface import implementer
from ceo_common.interfaces import IConfig
@implementer(IConfig)
class Config:
# TODO: read from a config file
_domain = 'csclub.internal'
_ldap_base = ','.join(['dc=' + dc for dc in _domain.split('.')])
_config = {
'ldap_admin_principal': 'ceod/admin',
'ldap_server_url': 'ldap://ldap-master.' + _domain,
'ldap_users_base': 'ou=People,' + _ldap_base,
'ldap_groups_base': 'ou=Group,' + _ldap_base,
'ldap_sasl_realm': _domain.upper(),
'member_min_id': 20001,
'member_max_id': 29999,
'club_min_id': 30001,
'club_max_id': 39999,
}
def get(self, key: str) -> str:
return self._config[key]

View File

@ -0,0 +1 @@
from .Config import Config

0
ceod/__init__.py Normal file
View File

84
ceod/model/Group.py Normal file
View File

@ -0,0 +1,84 @@
import copy
from typing import List, Dict, Union
from zope import component
from zope.interface import implementer
from .utils import strings_to_bytes, bytes_to_strings, dn_to_uid
from ceo_common.interfaces import ILDAPService, IGroup, IConfig, IUser
@implementer(IGroup)
class Group:
def __init__(
self, cn: str, gid_number: int,
members: Union[List[str], None] = None,
):
self.cn = cn
self.gid_number = gid_number
# this is a list of the DNs of the members in this group
self.member_DNs = members or []
cfg = component.getUtility(IConfig)
self.dn = f'cn={cn},{cfg.get("ldap_groups_base")}'
self.ldap_users_base = cfg.get('ldap_users_base')
self.ldap_srv = component.getUtility(ILDAPService)
def __repr__(self) -> str:
lines = [
'dn: ' + self.dn,
'cn: ' + self.cn,
'gidNumber: ' + str(self.gid_number),
'objectClass: top',
'objectClass: group',
'objectClass: posixGroup',
]
for member_dn in self.member_DNs:
lines.append('uniqueMember: ' + member_dn)
return '\n'.join(lines)
def add_to_ldap(self):
self.ldap_srv.add_group(self)
def serialize_for_modlist(self) -> Dict:
data = {
'cn': [self.cn],
'gidNumber': [str(self.gid_number)],
'objectClass': [
'top',
'group',
'posixGroup',
],
}
if self.member_DNs:
data['uniqueMember'] = self.member_DNs
return strings_to_bytes(data)
@staticmethod
def deserialize_from_dict(data: Dict) -> IGroup:
data = bytes_to_strings(data)
return Group(
cn=data['cn'][0],
gid_number=int(data['gidNumber'][0]),
members=data.get('uniqueMember'),
)
def add_member(self, username: str):
new_group = copy.copy(self)
new_group.member_DNs = self.member_DNs.copy()
new_group.member_DNs.append(f'uid={username},{self.ldap_users_base}')
self.ldap_srv.modify_group(self, new_group)
self.member_DNs = new_group.member_DNs
def remove_member(self, username: str):
new_group = copy.copy(self)
new_group.member_DNs = self.member_DNs.copy()
new_group.member_DNs.remove(f'uid={username},{self.ldap_users_base}')
self.ldap_srv.modify_group(self, new_group)
self.member_DNs = new_group.member_DNs
def get_members(self) -> List[IUser]:
members = []
for dn in self.member_DNs:
members.append(self.ldap_srv.get_user(dn_to_uid(dn)))
return members

View File

@ -0,0 +1,44 @@
import os
import subprocess
from zope import component
from zope.interface import implementer
from ceo_common.interfaces import IKerberosService
from ceo_common.interfaces import IConfig
@implementer(IKerberosService)
class KerberosService:
def __init__(self):
cfg = component.getUtility(IConfig)
self.admin_principal = cfg.get('ldap_admin_principal')
cache_file = '/run/ceod/krb5_cache'
os.makedirs('/run/ceod', exist_ok=True)
os.putenv('KRB5CCNAME', 'FILE:' + cache_file)
self.kinit()
def kinit(self):
subprocess.run(['kinit', '-k', 'ceod/admin'], check=True)
def addprinc(self, principal: str, password: str):
subprocess.run([
'kadmin', '-k', '-p', self.admin_principal, 'addprinc',
'-pw', password,
'-policy', 'default',
'+needchange',
principal
], check=True)
def change_password(self, principal: str, password: str):
subprocess.run([
'kadmin', '-k', '-p', self.admin_principal, 'cpw',
'-pw', password,
principal
], check=True)
subprocess.run([
'kadmin', '-k', '-p', self.admin_principal, 'modprinc',
'+needchange',
principal
], check=True)

139
ceod/model/LDAPService.py Normal file
View File

@ -0,0 +1,139 @@
import copy
import grp
import pwd
import ldap
import ldap.modlist
from zope import component
from zope.interface import implementer
from ceo_common.interfaces import ILDAPService, IKerberosService, IConfig, IUser, IGroup
from .User import User
from .Group import Group
class UserNotFoundError:
pass
class GroupNotFoundError:
pass
@implementer(ILDAPService)
class LDAPService:
def __init__(self):
cfg = component.getUtility(IConfig)
self.ldap_admin_principal = cfg.get('ldap_admin_principal')
self.ldap_server_url = cfg.get('ldap_server_url')
self.ldap_users_base = cfg.get('ldap_users_base')
self.ldap_groups_base = cfg.get('ldap_groups_base')
self.member_min_id = cfg.get('member_min_id')
self.member_max_id = cfg.get('member_max_id')
self.club_min_id = cfg.get('club_min_id')
self.club_max_id = cfg.get('club_max_id')
def _get_ldap_conn(self, gssapi_bind: bool = True) -> ldap.ldapobject.LDAPObject:
conn = ldap.initialize(self.ldap_server_url)
if gssapi_bind:
self._gssapi_bind(conn)
return conn
def _gssapi_bind(self, conn: ldap.ldapobject.LDAPObject):
krb_srv = component.getUtility(IKerberosService)
for i in range(2):
try:
conn.sasl_gssapi_bind_s()
return
except ldap.LOCAL_ERROR as err:
if 'Ticket expired' in err.args[0]['info']:
krb_srv.kinit()
continue
raise err
raise Exception('could not perform GSSAPI bind')
def get_user(self, username: str) -> IUser:
conn = self._get_ldap_conn(False)
base = f'uid={username},{self.ldap_users_base}'
try:
_, result = conn.search_s(base, ldap.SCOPE_BASE)[0]
return User.deserialize_from_dict(result)
except ldap.NO_SUCH_OBJECT:
raise UserNotFoundError()
def get_group(self, cn: str) -> IGroup:
conn = self._get_ldap_conn(False)
base = f'cn={cn},{self.ldap_groups_base}'
try:
_, result = conn.search_s(base, ldap.SCOPE_BASE)[0]
return Group.deserialize_from_dict(result)
except ldap.NO_SUCH_OBJECT:
raise GroupNotFoundError()
def _get_next_uid(self, conn: ldap.ldapobject.LDAPObject, min_id: int, max_id: int) -> int:
"""Gets the next available UID number between min_id and max_id, inclusive."""
def uid_exists(uid: int) -> bool:
try:
pwd.getpwuid(uid)
return True
except KeyError:
return False
def gid_exists(gid: int) -> bool:
try:
grp.getgrgid(gid)
return True
except KeyError:
return False
def ldap_uid_or_gid_exists(uid: int) -> bool:
results = conn.search_s(
self.ldap_users_base, ldap.SCOPE_ONELEVEL,
f'(|(uidNumber={uid})(gidNumber={uid}))')
return len(results) > 0
# TODO: replace this with binary search
for uid in range(min_id, max_id + 1):
if uid_exists(uid) or gid_exists(uid) or ldap_uid_or_gid_exists(uid):
continue
return uid
raise Exception('no UIDs remaining')
def save_user(self, user: IUser) -> IUser:
if user.is_club():
min_id, max_id = self.club_min_id, self.club_max_id
else:
min_id, max_id = self.member_min_id, self.member_max_id
conn = self._get_ldap_conn()
uid_number = self._get_next_uid(conn, min_id, max_id)
new_user = copy.deepcopy(user)
new_user.uid_number = uid_number
new_user.gid_number = uid_number
modlist = ldap.modlist.addModlist(new_user.serialize_for_modlist())
conn.add_s(new_user.dn, modlist)
return new_user
def save_group(self, group: IGroup) -> IGroup:
conn = self._get_ldap_conn()
# make sure that the caller initialized the GID number
assert group.gid_number
modlist = ldap.modlist.addModlist(group.serialize_for_modlist())
conn.add_s(group.dn, modlist)
return group
def modify_user(self, old_user: IUser, new_user: IUser):
conn = self._get_ldap_conn()
modlist = ldap.modlist.modifyModlist(
old_user.serialize_for_modlist(),
new_user.serialize_for_modlist(),
)
conn.modify_s(old_user.dn, modlist)
def modify_group(self, old_group: IGroup, new_group: IGroup):
conn = self._get_ldap_conn()
modlist = ldap.modlist.modifyModlist(
old_group.serialize_for_modlist(),
new_group.serialize_for_modlist(),
)
conn.modify_s(old_group.dn, modlist)

169
ceod/model/User.py Normal file
View File

@ -0,0 +1,169 @@
import copy
import os
from typing import List, Dict, Union
from zope import component
from zope.interface import implementer
from .utils import strings_to_bytes, bytes_to_strings
from ceo_common.interfaces import ILDAPService, IKerberosService, IUser, IConfig
@implementer(IUser)
class User:
def __init__(
self, uid: str, cn: str,
program: Union[str, None] = None,
terms: Union[List[str], None] = None,
non_member_terms: Union[List[str], None] = None,
login_shell: str = '/bin/bash',
uid_number: Union[int, None] = None,
gid_number: Union[int, None] = None,
home_directory: Union[str, None] = None,
positions: Union[List[str], None] = None,
mail_local_addresses: Union[List[str], None] = None,
is_club: bool = False,
):
if not is_club and not terms and not non_member_terms:
raise Exception('terms and non_member_terms cannot both be empty')
self.uid = uid
self.cn = cn
self.program = program
self.terms = terms or []
self.non_member_terms = non_member_terms or []
self.login_shell = login_shell
self.uid_number = uid_number
self.gid_number = gid_number
self.home_directory = home_directory or os.path.join('/users', uid)
self.positions = positions or []
self.mail_local_addresses = mail_local_addresses or []
self._is_club = is_club
cfg = component.getUtility(IConfig)
self.ldap_sasl_realm = cfg.get('ldap_sasl_realm')
self.dn = f'uid={uid},{cfg.get("ldap_users_base")}'
self.ldap_srv = component.getUtility(ILDAPService)
self.krb_srv = component.getUtility(IKerberosService)
@property
def forwarding_addresses(self):
raise NotImplementedError()
def __repr__(self) -> str:
lines = [
'dn: ' + self.dn,
'cn: ' + self.cn,
'uid: ' + self.uid,
'objectClass: top',
'objectClass: account',
'objectClass: posixAccount',
'objectClass: shadowAccount',
'objectClass: ' + ('club' if self.is_club() else 'member'),
'uidNumber: ' + str(self.uid_number),
'gidNumber: ' + str(self.gid_number),
'loginShell: ' + self.login_shell,
'homeDirectory: ' + self.home_directory,
]
if self.program:
lines.append('program: ' + self.program)
for term in self.terms:
lines.append('term: ' + term)
for term in self.non_member_terms:
lines.append('nonMemberTerm: ' + term)
for position in self.positions:
lines.append('position: ' + position)
for address in self.mail_local_addresses:
lines.append('mailLocalAddress: ' + address)
return '\n'.join(lines)
def is_club(self) -> bool:
return self._is_club
def add_to_ldap(self):
new_member = self.ldap_srv.save_user(self)
self.uid_number = new_member.uid_number
self.gid_number = new_member.gid_number
def add_to_kerberos(self, password: str):
self.krb_srv.addprinc(self.uid, password)
def change_password(self, password: str):
self.krb_srv.change_password(self.uid, password)
def serialize_for_modlist(self) -> Dict:
data = {
'cn': [self.cn],
'loginShell': [self.login_shell],
'homeDirectory': [self.home_directory],
'uid': [self.uid],
'uidNumber': [str(self.uid_number)],
'gidNumber': [str(self.gid_number)],
'objectClass': [
'top',
'account',
'posixAccount',
'shadowAccount',
],
}
if self.is_club():
data['objectClass'].append('club')
else:
data['objectClass'].append('member')
data['userPassword'] = ['{SASL}%s@%s' % (self.uid, self.ldap_sasl_realm)]
if self.program:
data['program'] = [self.program]
if self.terms:
data['term'] = self.terms
if self.non_member_terms:
data['nonMemberTerm'] = self.non_member_terms
if self.positions:
data['position'] = self.positions
if self.mail_local_addresses:
data['mailLocalAddress'] = self.mail_local_addresses
data['objectClass'].append('inetLocalMailRecipient')
return strings_to_bytes(data)
@staticmethod
def deserialize_from_dict(data: Dict) -> IUser:
data = bytes_to_strings(data)
return User(
uid=data['uid'][0],
cn=data['cn'][0],
program=data.get('program', [None])[0],
terms=data.get('term'),
non_member_terms=data.get('nonUserTerm'),
login_shell=data['loginShell'][0],
uid_number=int(data['uidNumber'][0]),
gid_number=int(data['gidNumber'][0]),
home_directory=data['homeDirectory'][0],
positions=data.get('position'),
mail_local_addresses=data.get('mailLocalAddress', []),
is_club=('club' in data['objectClass']),
)
def add_terms(self, terms: List[str]):
new_user = copy.copy(self)
new_user.terms = self.terms.copy()
new_user.terms.extend(terms)
self.ldap_srv.modify_user(self, new_user)
self.terms = new_user.terms
def add_non_member_terms(self, terms: List[str]):
new_user = copy.copy(self)
new_user.non_member_terms = self.non_member_terms.copy()
new_user.non_member_terms.extend(terms)
self.ldap_srv.modify_user(self, new_user)
self.non_member_terms = new_user.non_member_terms
def add_position(self, position: str):
new_user = copy.copy(self)
new_user.positions = [*self.positions, position]
self.ldap_srv.modify_user(self, new_user)
self.positions = new_user.positions
def remove_position(self, position: str):
new_user = copy.copy(self)
new_user.positions = self.positions.copy()
new_user.positions.remove(position)
self.ldap_srv.modify_user(self, new_user)
self.positions = new_user.positions

4
ceod/model/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .KerberosService import KerberosService
from .LDAPService import LDAPService, UserNotFoundError, GroupNotFoundError
from .User import User
from .Group import Group

34
ceod/model/utils.py Normal file
View File

@ -0,0 +1,34 @@
import base64
import os
from typing import Dict, List
def bytes_to_strings(data: Dict[str, List[bytes]]) -> Dict[str, List[str]]:
"""Convert the attribute values from bytes to strings"""
return {
key: [b.decode() for b in val]
for key, val in data.items()
}
def strings_to_bytes(data: Dict[str, List[str]]) -> Dict[str, List[bytes]]:
"""Convert the attribute values from strings to bytes"""
return {
key: [b.encode() for b in val]
for key, val in data.items()
}
def dn_to_uid(dn: str) -> str:
"""Extract the UID from an LDAP DN.
Examples:
dn_to_uid('uid=ctdalek,ou=People,dc=csclub,dc=uwaterloo,dc=ca')
-> 'ctdalek'
"""
return dn.split(',', 1)[0].split('=')[1]
def gen_password() -> str:
# good enough
return base64.b64encode(os.urandom(18)).decode()