Allow offsck to add members to the office group (#126)
continuous-integration/drone/push Build is passing Details

Closes #62.

Reviewed-on: #126
This commit is contained in:
Max Erenberg 2024-02-17 19:31:03 -05:00
parent bf7f1c7724
commit 9c51ad3a01
7 changed files with 93 additions and 28 deletions

View File

@ -3,7 +3,8 @@ from flask.json import jsonify
from zope import component
from .utils import authz_restrict_to_syscom, is_truthy, \
create_streaming_response, development_only
create_streaming_response, development_only, requires_admin_creds, \
requires_authentication_no_realm, user_is_in_group
from ceo_common.interfaces import ILDAPService
from ceo_common.utils import fuzzy_result, fuzzy_match
from ceod.transactions.groups import (
@ -52,9 +53,26 @@ def search_group(query, count):
return jsonify(result)
def may_add_user_to_group(auth_username: str, group_name: str) -> bool:
# (is syscom) OR (group is office AND client is offsck)
if user_is_in_group(auth_username, 'syscom'):
return True
if group_name == 'office':
ldap_srv = component.getUtility(ILDAPService)
auth_user = ldap_srv.get_user(auth_username)
if 'offsck' in auth_user.positions:
return True
return False
@bp.route('/<group_name>/members/<username>', methods=['POST'])
@authz_restrict_to_syscom
def add_member_to_group(group_name, username):
@requires_admin_creds
@requires_authentication_no_realm
def add_member_to_group(auth_username: str, group_name: str, username: str):
# Admin creds are required because slapd does not support access control
# rules which use the client's attributes
if not may_add_user_to_group(auth_username, group_name):
return {'error': "not authorized to add user to group"}, 403
subscribe_to_lists = is_truthy(
request.args.get('subscribe_to_lists', 'true')
)
@ -67,8 +85,13 @@ def add_member_to_group(group_name, username):
@bp.route('/<group_name>/members/<username>', methods=['DELETE'])
@authz_restrict_to_syscom
def remove_member_from_group(group_name, username):
@requires_admin_creds
@requires_authentication_no_realm
def remove_member_from_group(auth_username: str, group_name: str, username: str):
# Admin creds are required because slapd does not support access control
# rules which use the client's attributes
if not may_add_user_to_group(auth_username, group_name):
return {'error': "not authorized to add user to group"}, 403
unsubscribe_from_lists = is_truthy(
request.args.get('unsubscribe_from_lists', 'true')
)

View File

@ -136,12 +136,13 @@ def test_members_renew(cli_setup, ldap_user, g_admin_ctx):
assert result.output == expected
def test_members_pwreset(cli_setup, ldap_user, krb_user):
def test_members_pwreset(cli_setup, ldap_and_krb_user):
uid = ldap_and_krb_user.uid
runner = CliRunner()
result = runner.invoke(
cli, ['members', 'pwreset', ldap_user.uid], input='y\n')
cli, ['members', 'pwreset', uid], input='y\n')
expected_pat = re.compile((
f"^Are you sure you want to reset {ldap_user.uid}'s password\\? \\[y/N\\]: y\n"
f"^Are you sure you want to reset {uid}'s password\\? \\[y/N\\]: y\n"
"New password: \\S+\n$"
), re.MULTILINE)
assert result.exit_code == 0

View File

@ -5,8 +5,8 @@ from mysql.connector import connect
from mysql.connector.errors import ProgrammingError
def test_api_create_mysql_db(cfg, client, g_admin_ctx, ldap_user, krb_user):
uid = ldap_user.uid
def test_api_create_mysql_db(cfg, client, g_admin_ctx, ldap_and_krb_user):
uid = ldap_and_krb_user.uid
with g_admin_ctx():
user = User(uid='someone_else', cn='Some Name', given_name='Some',
@ -72,8 +72,8 @@ def test_api_create_mysql_db(cfg, client, g_admin_ctx, ldap_user, krb_user):
user.remove_from_ldap()
def test_api_passwd_reset_mysql(cfg, client, g_admin_ctx, ldap_user, krb_user):
uid = ldap_user.uid
def test_api_passwd_reset_mysql(cfg, client, g_admin_ctx, ldap_and_krb_user):
uid = ldap_and_krb_user.uid
with g_admin_ctx():
user = User(uid='someone_else', cn='Some Name', given_name='Some',

View File

@ -4,8 +4,8 @@ from ceod.model import User
from psycopg2 import connect, OperationalError, ProgrammingError
def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_user, krb_user):
uid = ldap_user.uid
def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_and_krb_user):
uid = ldap_and_krb_user.uid
with g_admin_ctx():
user = User(uid='someone_else', cn='Some Name', given_name='Some',
@ -74,8 +74,8 @@ def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_user, krb_user):
user.remove_from_ldap()
def test_api_passwd_reset_psql(cfg, client, g_admin_ctx, ldap_user, krb_user):
uid = ldap_user.uid
def test_api_passwd_reset_psql(cfg, client, g_admin_ctx, ldap_and_krb_user):
uid = ldap_and_krb_user.uid
with g_admin_ctx():
user = User(uid='someone_else', cn='Some Name', given_name='Some',

View File

@ -2,6 +2,7 @@ import ldap3
import pytest
from ceod.model import Group
from tests.conftest_ceod_api import expect_successful_txn
def test_api_group_not_found(client):
@ -9,20 +10,28 @@ def test_api_group_not_found(client):
assert status == 404
@pytest.fixture(scope='module')
def create_group_resp(client):
def create_group(client, cn: str, description: str):
status, data = client.post('/api/groups', json={
'cn': 'test_group1',
'description': 'Test Group One',
'cn': cn,
'description': description,
})
assert status == 200
assert data[-1]['status'] == 'completed'
yield status, data
status, data = client.delete('/api/groups/test_group1')
return status, data
def delete_group(client, cn: str):
status, data = client.delete(f'/api/groups/{cn}')
assert status == 200
assert data[-1]['status'] == 'completed'
@pytest.fixture(scope='module')
def create_group_resp(client):
yield create_group(client, 'test_group1', 'Test Group One')
delete_group(client, 'test_group1')
@pytest.fixture(scope='module')
def create_group_result(create_group_resp):
# convenience method
@ -126,12 +135,7 @@ def create_random_names():
def create_searchable_groups(client, create_random_names):
random_names = create_random_names
for name in random_names:
status, data = client.post('/api/groups', json={
'cn': name,
'description': 'Groups with distinct names for testing searching',
})
assert status == 200
assert data[-1]['status'] == 'completed'
create_group(client, name, 'Groups with distinct names for testing searching')
yield random_names
@ -210,3 +214,27 @@ def test_api_group_auxiliary(cfg, client, ldap_user, g_admin_ctx):
for group in groups:
if group.cn != 'syscom':
group.remove_from_ldap()
@pytest.mark.parametrize('desc,expect_success', [
('in_syscom', True),
('is_offsck', True),
(None, False),
])
def test_api_add_or_remove_member_from_office(desc, expect_success, cfg, client, ldap_and_krb_user, new_user, g_admin_ctx):
uid = ldap_and_krb_user.uid
if desc == 'in_syscom':
uid = 'ctdalek'
elif desc == 'is_offsck':
with g_admin_ctx():
ldap_and_krb_user.set_positions(['offsck'])
def handle_resp(status_and_data):
if expect_success:
expect_successful_txn(status_and_data)
else:
status = status_and_data[0]
assert status == 403
handle_resp(client.post(f'/api/groups/office/members/{new_user.uid}', principal=uid))
handle_resp(client.delete(f'/api/groups/office/members/{new_user.uid}', principal=uid))

View File

@ -538,6 +538,12 @@ def krb_user(simple_user):
simple_user.remove_from_kerberos()
@pytest.fixture
def ldap_and_krb_user(ldap_user, krb_user):
# Note that ldap_user and krb_user are both the same person (simple_user)
yield ldap_user
@pytest.fixture
def new_user_gen(
client, g_admin_ctx, ldap_srv_session, mocks_for_create_user, # noqa: F811

View File

@ -1,5 +1,6 @@
import json
import socket
from typing import Any, Dict, List, Tuple
import flask
from flask.testing import FlaskClient
@ -76,3 +77,9 @@ class CeodTestClient:
def put(self, path, principal=None, need_auth=True, delegate=True, **kwargs):
return self.request('PUT', path, principal, need_auth, delegate, **kwargs)
def expect_successful_txn(status_and_data: Tuple[int, List[Dict[str, Any]]]) -> None:
status, data = status_and_data
assert status == 200
assert data[-1]['status'] == 'completed'