# Get a Kerberos TGT first
curl --negotiate -u : --service-name ceod http://mail:9987/api/mailman/csc-general/ctdalek
Not all endpoints are SPNEGO-protected, e.g. to get info for a member:
curl http://phosphoric-acid:9987/api/members/ctdalek
curl --negotiate -u : --service-name ceod -X POST http://mail:9987/api/mailman/csc-general/ctdalek

def change_password(password: str):
"""Replace this user's password."""
def replace_login_shell(login_shell: str):
"""Replace this user's login shell."""
def create_home_dir():
"""Create a new home directory for this user."""
def delete_home_dir():
"""Delete this user's home dir."""
def subscribe_to_mailing_list(mailing_list: str):
"""Subscribe this user to the mailing list."""
def unsubscribe_from_mailing_list(mailing_list: str):
"""Unsubscribe this user from the mailing list."""
def serialize_for_ldap() -> Dict[str, List[bytes]]:
Serialize this user into a dict to be passed to
:returns: IUser
def to_dict() -> Dict:
"""Serialize this user into a dict."""
def to_dict(get_forwarding_addresses: bool = False) -> Dict:
Serialize this user into a dict.
If get_forwarding_addresses is True, the forwarding addresses
for the user will also be returned, if present.

from flask import Blueprint, request
from zope import component
from .utils import authz_restrict_to_staff, create_streaming_response
from .utils import authz_restrict_to_staff, user_is_in_group, \
requires_authentication_no_realm, create_streaming_response
from ceo_common.errors import UserNotFoundError
from ceo_common.interfaces import ILDAPService
from ceod.transactions.members import AddMemberTransaction
from ceod.transactions.members import (
bp = Blueprint('members', __name__)
def get_user(username: str):
def get_user(auth_user: str, username: str):
get_forwarding_addresses = False
if auth_user == username or user_is_in_group(auth_user, 'syscom'):
get_forwarding_addresses = True
ldap_srv = component.getUtility(ILDAPService)
return ldap_srv.get_user(username).to_dict(get_forwarding_addresses)
except UserNotFoundError:
return {
'error': 'user not found'
}, 404
@bp.route('/<username>', methods=['PATCH'])
def patch_user(auth_user: str, username: str):
if not (auth_user == username or user_is_in_group(auth_user, 'syscom')):
return {
'error': "You are not authorized to modify other users' attributes"
}, 403
body = request.get_json(force=True)
txn = ModifyMemberTransaction(
return create_streaming_response(txn)

import functools
import grp
import json
import os
import pwd
import socket
import traceback
from typing import Callable, List
from flask import current_app
return error_decorator
def requires_authentication_no_realm(f: Callable) -> Callable:
Like requires_authentication, but strips the realm out of the principal string.
e.g. user1@CSCLUB.UWATERLOO.CA -> user1
def wrapper(principal: str, *args, **kwargs):
user = principal[:principal.index('@')]
logger.debug(f'received request from {user}')
return f(user, *args, **kwargs)
return wrapper
def user_is_in_group(user: str, group: str) -> bool:
"""Returns True if `user` is in `group`, False otherwise."""
return user in grp.getgrnam(group).gr_mem
def authz_restrict_to_groups(f: Callable, allowed_groups: List[str]) -> Callable:
Restrict an endpoint to users who belong to one or more of the
specified groups.
# TODO: cache group members, but place a time limit on the cache validity
allowed_group_ids = [grp.getgrnam(g).gr_gid for g in allowed_groups]
def wrapper(user: str, *args, **kwargs):
:param user: a Kerberos principal (e.g. 'user1@CSCLUB.UWATERLOO.CA')
logger.debug(f'received request from {user}')
username = user[:user.index('@')]
def wrapper(_username: str, *args, **kwargs):
# we need to call the argument _username to avoid name clashes with
# the arguments of f
username = _username
if username.startswith('ceod/'):
# ceod services are always allowed to make internal calls
return f(*args, **kwargs)
for group in allowed_groups:
for group_member in grp.getgrnam(group).gr_mem:
if username == group_member:
return f(*args, **kwargs)
for gid in os.getgrouplist(username, pwd.getpwnam(username).pw_gid):
if gid in allowed_group_ids:
return f(*args, **kwargs)
f"User '{username}' denied since they are not in one of {allowed_groups}"
'result': txn.result,
}) + '\n'
except Exception as err:
logger.warning('Transaction failed:\n' + traceback.format_exc())
yield json.dumps({
'status': 'aborted',

from zope.interface import implementer
from .utils import strings_to_bytes, bytes_to_strings
from .validators import is_valid_shell
from ceo_common.interfaces import ILDAPService, IKerberosService, IFileService, \
IUser, IConfig
IUser, IConfig, IMailmanService
@ -56,7 +57,7 @@ class User:
self.krb_srv = component.getUtility(IKerberosService)
self.file_srv = component.getUtility(IFileService)
def to_dict(self, get_forwarding_addresses: bool = False) -> Dict:
data = {
'dn': self.dn,
@ -77,6 +78,8 @@ class User:
data['positions'] = self.positions
if self.mail_local_addresses:
data['mail_local_addresses'] = self.mail_local_addresses
if get_forwarding_addresses:
data['forwarding_addresses'] = self.get_forwarding_addresses()
return data
def __repr__(self) -> str:
@ -110,6 +113,12 @@ class User:
def delete_home_dir(self):
def subscribe_to_mailing_list(self, mailing_list: str):
component.getUtility(IMailmanService).subscribe(self.uid, mailing_list)
def unsubscribe_from_mailing_list(self, mailing_list: str):
component.getUtility(IMailmanService).unsubscribe(self.uid, mailing_list)
def serialize_for_ldap(self) -> Dict:
data = {
'cn': [],
@ -161,6 +170,13 @@ class User:
is_club=('club' in data['objectClass']),
def replace_login_shell(self, login_shell: str):
new_user = copy.copy(self)
if not is_valid_shell(login_shell):
raise Exception('%s is not a valid shell' % login_shell)
new_user.login_shell = login_shell
self.ldap_srv.modify_user(self, new_user)
def add_terms(self, terms: List[str]):
new_user = copy.copy(self)
new_user.terms = self.terms.copy()

def is_valid_forwarding_address(address: str) -> bool:
"""Returns True if the address is a valid forwarding address."""
return valid_forwarding_address_re.match(address) is not None
def is_valid_shell(shell: str) -> bool:
"""Returns True if this shell is in /etc/shells, otherwise False."""
return shell in [
line.strip() for line in open('/etc/shells')
if line != '' and not line.isspace()

self.result = None
def finish(self, result):
Store a result in this object so that it can be returned to the client.
self.result = result

import base64
import os
import traceback
from typing import Union, List
from zope import component
from ..AbstractTransaction import AbstractTransaction
from ceo_common.interfaces import IConfig, IMailService, IMailmanService
from ceo_common.interfaces import IConfig, IMailService
from ceo_common.logger_factory import logger_factory
from ceod.model import User, Group
logger = logger_factory(__name__)
def gen_password() -> str:
"""Generate a temporary password."""
@ -48,7 +52,6 @@ class AddMemberTransaction(AbstractTransaction): = None
self.new_member_list = cfg.get('new_member_list')
self.mail_srv = component.getUtility(IMailService)
self.mailman_srv = component.getUtility(IMailmanService)
def child_execute_iter(self):
member = User(
@ -84,14 +87,21 @@ class AddMemberTransaction(AbstractTransaction):
# The following operations can't/shouldn't be rolled back because the
# user has already seen the email
yield 'send_welcome_message'
# This will be done on mail (remote)
self.mailman_srv.subscribe(member.uid, self.new_member_list)
yield 'subscribe_to_mailing_list'
user_json = member.to_dict()
yield 'send_welcome_message'
except Exception as err:
logger.warning('send_welcome_message failed:\n' + traceback.format_exc())
yield 'failed_to_send_welcome_message\n' + str(err)
yield 'subscribe_to_mailing_list'
except Exception as err:
logger.warning('subscribe_to_mailing_list failed:\n' + traceback.format_exc())
yield 'failed_to_subscribe_to_mailing_list\n' + str(err)
user_json = member.to_dict(True)
# insert the password into the JSON so that the client can see it
user_json['password'] = password

from typing import Union, List
from zope import component
from ..AbstractTransaction import AbstractTransaction
from ceo_common.interfaces import ILDAPService
class ModifyMemberTransaction(AbstractTransaction):
Transaction to modify a member's attributes.
These attributes should be modifiable by the user themselves.
operations = [
def __init__(
username: str,
login_shell: Union[str, None],
forwarding_addresses: Union[List[str], None],
self.username = username
self.login_shell = login_shell
self.forwarding_addresses = forwarding_addresses
self.ldap_srv = component.getUtility(ILDAPService)
# For the rollback
self.user = None
self.old_login_shell = None
self.old_forwarding_addresses = None
def child_execute_iter(self):
user = self.ldap_srv.get_user(self.username)
self.user = user
if self.login_shell:
self.old_login_shell = user.login_shell
yield 'replace_login_shell'
if self.forwarding_addresses:
self.old_forwarding_addresses = user.get_forwarding_addresses()
yield 'replace_forwarding_addresses'
def rollback(self):
if 'replace_login_shell' in self.finished_operations:
if 'replace_forwarding_addresses' in self.finished_operations:

from .AddMemberTransaction import AddMemberTransaction
from .ModifyMemberTransaction import ModifyMemberTransaction