diff --git a/ceod/api/groups.py b/ceod/api/groups.py index 802d7a7..1f9e3e5 100644 --- a/ceod/api/groups.py +++ b/ceod/api/groups.py @@ -3,7 +3,8 @@ from flask.json import jsonify from zope import component from .utils import authz_restrict_to_syscom, is_truthy, \ - create_streaming_response, development_only + create_streaming_response, development_only, requires_admin_creds, \ + requires_authentication_no_realm, user_is_in_group from ceo_common.interfaces import ILDAPService from ceo_common.utils import fuzzy_result, fuzzy_match from ceod.transactions.groups import ( @@ -52,9 +53,26 @@ def search_group(query, count): return jsonify(result) +def may_add_user_to_group(auth_username: str, group_name: str) -> bool: + # (is syscom) OR (group is office AND client is offsck) + if user_is_in_group(auth_username, 'syscom'): + return True + if group_name == 'office': + ldap_srv = component.getUtility(ILDAPService) + auth_user = ldap_srv.get_user(auth_username) + if 'offsck' in auth_user.positions: + return True + return False + + @bp.route('//members/', methods=['POST']) -@authz_restrict_to_syscom -def add_member_to_group(group_name, username): +@requires_admin_creds +@requires_authentication_no_realm +def add_member_to_group(auth_username: str, group_name: str, username: str): + # Admin creds are required because slapd does not support access control + # rules which use the client's attributes + if not may_add_user_to_group(auth_username, group_name): + return {'error': "not authorized to add user to group"}, 403 subscribe_to_lists = is_truthy( request.args.get('subscribe_to_lists', 'true') ) @@ -67,8 +85,13 @@ def add_member_to_group(group_name, username): @bp.route('//members/', methods=['DELETE']) -@authz_restrict_to_syscom -def remove_member_from_group(group_name, username): +@requires_admin_creds +@requires_authentication_no_realm +def remove_member_from_group(auth_username: str, group_name: str, username: str): + # Admin creds are required because slapd does not support access control + # rules which use the client's attributes + if not may_add_user_to_group(auth_username, group_name): + return {'error': "not authorized to add user to group"}, 403 unsubscribe_from_lists = is_truthy( request.args.get('unsubscribe_from_lists', 'true') ) diff --git a/tests/ceo/cli/test_members.py b/tests/ceo/cli/test_members.py index 6748f0c..a6c6461 100644 --- a/tests/ceo/cli/test_members.py +++ b/tests/ceo/cli/test_members.py @@ -136,12 +136,13 @@ def test_members_renew(cli_setup, ldap_user, g_admin_ctx): assert result.output == expected -def test_members_pwreset(cli_setup, ldap_user, krb_user): +def test_members_pwreset(cli_setup, ldap_and_krb_user): + uid = ldap_and_krb_user.uid runner = CliRunner() result = runner.invoke( - cli, ['members', 'pwreset', ldap_user.uid], input='y\n') + cli, ['members', 'pwreset', uid], input='y\n') expected_pat = re.compile(( - f"^Are you sure you want to reset {ldap_user.uid}'s password\\? \\[y/N\\]: y\n" + f"^Are you sure you want to reset {uid}'s password\\? \\[y/N\\]: y\n" "New password: \\S+\n$" ), re.MULTILINE) assert result.exit_code == 0 diff --git a/tests/ceod/api/test_db_mysql.py b/tests/ceod/api/test_db_mysql.py index 2ef9c08..7a178d5 100644 --- a/tests/ceod/api/test_db_mysql.py +++ b/tests/ceod/api/test_db_mysql.py @@ -5,8 +5,8 @@ from mysql.connector import connect from mysql.connector.errors import ProgrammingError -def test_api_create_mysql_db(cfg, client, g_admin_ctx, ldap_user, krb_user): - uid = ldap_user.uid +def test_api_create_mysql_db(cfg, client, g_admin_ctx, ldap_and_krb_user): + uid = ldap_and_krb_user.uid with g_admin_ctx(): user = User(uid='someone_else', cn='Some Name', given_name='Some', @@ -72,8 +72,8 @@ def test_api_create_mysql_db(cfg, client, g_admin_ctx, ldap_user, krb_user): user.remove_from_ldap() -def test_api_passwd_reset_mysql(cfg, client, g_admin_ctx, ldap_user, krb_user): - uid = ldap_user.uid +def test_api_passwd_reset_mysql(cfg, client, g_admin_ctx, ldap_and_krb_user): + uid = ldap_and_krb_user.uid with g_admin_ctx(): user = User(uid='someone_else', cn='Some Name', given_name='Some', diff --git a/tests/ceod/api/test_db_psql.py b/tests/ceod/api/test_db_psql.py index 4c60e12..402508d 100644 --- a/tests/ceod/api/test_db_psql.py +++ b/tests/ceod/api/test_db_psql.py @@ -4,8 +4,8 @@ from ceod.model import User from psycopg2 import connect, OperationalError, ProgrammingError -def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_user, krb_user): - uid = ldap_user.uid +def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_and_krb_user): + uid = ldap_and_krb_user.uid with g_admin_ctx(): user = User(uid='someone_else', cn='Some Name', given_name='Some', @@ -74,8 +74,8 @@ def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_user, krb_user): user.remove_from_ldap() -def test_api_passwd_reset_psql(cfg, client, g_admin_ctx, ldap_user, krb_user): - uid = ldap_user.uid +def test_api_passwd_reset_psql(cfg, client, g_admin_ctx, ldap_and_krb_user): + uid = ldap_and_krb_user.uid with g_admin_ctx(): user = User(uid='someone_else', cn='Some Name', given_name='Some', diff --git a/tests/ceod/api/test_groups.py b/tests/ceod/api/test_groups.py index 00679af..ab90ca4 100644 --- a/tests/ceod/api/test_groups.py +++ b/tests/ceod/api/test_groups.py @@ -2,6 +2,7 @@ import ldap3 import pytest from ceod.model import Group +from tests.conftest_ceod_api import expect_successful_txn def test_api_group_not_found(client): @@ -9,20 +10,28 @@ def test_api_group_not_found(client): assert status == 404 -@pytest.fixture(scope='module') -def create_group_resp(client): +def create_group(client, cn: str, description: str): status, data = client.post('/api/groups', json={ - 'cn': 'test_group1', - 'description': 'Test Group One', + 'cn': cn, + 'description': description, }) assert status == 200 assert data[-1]['status'] == 'completed' - yield status, data - status, data = client.delete('/api/groups/test_group1') + return status, data + + +def delete_group(client, cn: str): + status, data = client.delete(f'/api/groups/{cn}') assert status == 200 assert data[-1]['status'] == 'completed' +@pytest.fixture(scope='module') +def create_group_resp(client): + yield create_group(client, 'test_group1', 'Test Group One') + delete_group(client, 'test_group1') + + @pytest.fixture(scope='module') def create_group_result(create_group_resp): # convenience method @@ -126,12 +135,7 @@ def create_random_names(): def create_searchable_groups(client, create_random_names): random_names = create_random_names for name in random_names: - status, data = client.post('/api/groups', json={ - 'cn': name, - 'description': 'Groups with distinct names for testing searching', - }) - assert status == 200 - assert data[-1]['status'] == 'completed' + create_group(client, name, 'Groups with distinct names for testing searching') yield random_names @@ -210,3 +214,27 @@ def test_api_group_auxiliary(cfg, client, ldap_user, g_admin_ctx): for group in groups: if group.cn != 'syscom': group.remove_from_ldap() + + +@pytest.mark.parametrize('desc,expect_success', [ + ('in_syscom', True), + ('is_offsck', True), + (None, False), +]) +def test_api_add_or_remove_member_from_office(desc, expect_success, cfg, client, ldap_and_krb_user, new_user, g_admin_ctx): + uid = ldap_and_krb_user.uid + if desc == 'in_syscom': + uid = 'ctdalek' + elif desc == 'is_offsck': + with g_admin_ctx(): + ldap_and_krb_user.set_positions(['offsck']) + + def handle_resp(status_and_data): + if expect_success: + expect_successful_txn(status_and_data) + else: + status = status_and_data[0] + assert status == 403 + + handle_resp(client.post(f'/api/groups/office/members/{new_user.uid}', principal=uid)) + handle_resp(client.delete(f'/api/groups/office/members/{new_user.uid}', principal=uid)) diff --git a/tests/conftest.py b/tests/conftest.py index 1374574..8fad37a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -538,6 +538,12 @@ def krb_user(simple_user): simple_user.remove_from_kerberos() +@pytest.fixture +def ldap_and_krb_user(ldap_user, krb_user): + # Note that ldap_user and krb_user are both the same person (simple_user) + yield ldap_user + + @pytest.fixture def new_user_gen( client, g_admin_ctx, ldap_srv_session, mocks_for_create_user, # noqa: F811 diff --git a/tests/conftest_ceod_api.py b/tests/conftest_ceod_api.py index 8d12f35..a6c8e46 100644 --- a/tests/conftest_ceod_api.py +++ b/tests/conftest_ceod_api.py @@ -1,5 +1,6 @@ import json import socket +from typing import Any, Dict, List, Tuple import flask from flask.testing import FlaskClient @@ -76,3 +77,9 @@ class CeodTestClient: def put(self, path, principal=None, need_auth=True, delegate=True, **kwargs): return self.request('PUT', path, principal, need_auth, delegate, **kwargs) + + +def expect_successful_txn(status_and_data: Tuple[int, List[Dict[str, Any]]]) -> None: + status, data = status_and_data + assert status == 200 + assert data[-1]['status'] == 'completed'