Allow offsck to add members to the office group #126
|
@ -3,7 +3,8 @@ from flask.json import jsonify
|
||||||
from zope import component
|
from zope import component
|
||||||
|
|
||||||
from .utils import authz_restrict_to_syscom, is_truthy, \
|
from .utils import authz_restrict_to_syscom, is_truthy, \
|
||||||
create_streaming_response, development_only
|
create_streaming_response, development_only, requires_admin_creds, \
|
||||||
|
requires_authentication_no_realm, user_is_in_group
|
||||||
from ceo_common.interfaces import ILDAPService
|
from ceo_common.interfaces import ILDAPService
|
||||||
from ceo_common.utils import fuzzy_result, fuzzy_match
|
from ceo_common.utils import fuzzy_result, fuzzy_match
|
||||||
from ceod.transactions.groups import (
|
from ceod.transactions.groups import (
|
||||||
|
@ -52,9 +53,26 @@ def search_group(query, count):
|
||||||
return jsonify(result)
|
return jsonify(result)
|
||||||
|
|
||||||
|
|
||||||
|
def may_add_user_to_group(auth_username: str, group_name: str) -> bool:
|
||||||
|
# (is syscom) OR (group is office AND client is offsck)
|
||||||
|
if user_is_in_group(auth_username, 'syscom'):
|
||||||
|
return True
|
||||||
|
if group_name == 'office':
|
||||||
|
ldap_srv = component.getUtility(ILDAPService)
|
||||||
|
auth_user = ldap_srv.get_user(auth_username)
|
||||||
|
if 'offsck' in auth_user.positions:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
@bp.route('/<group_name>/members/<username>', methods=['POST'])
|
@bp.route('/<group_name>/members/<username>', methods=['POST'])
|
||||||
@authz_restrict_to_syscom
|
@requires_admin_creds
|
||||||
def add_member_to_group(group_name, username):
|
@requires_authentication_no_realm
|
||||||
|
def add_member_to_group(auth_username: str, group_name: str, username: str):
|
||||||
|
# Admin creds are required because slapd does not support access control
|
||||||
|
# rules which use the client's attributes
|
||||||
|
if not may_add_user_to_group(auth_username, group_name):
|
||||||
|
return {'error': "not authorized to add user to group"}, 403
|
||||||
subscribe_to_lists = is_truthy(
|
subscribe_to_lists = is_truthy(
|
||||||
request.args.get('subscribe_to_lists', 'true')
|
request.args.get('subscribe_to_lists', 'true')
|
||||||
)
|
)
|
||||||
|
@ -67,8 +85,13 @@ def add_member_to_group(group_name, username):
|
||||||
|
|
||||||
|
|
||||||
@bp.route('/<group_name>/members/<username>', methods=['DELETE'])
|
@bp.route('/<group_name>/members/<username>', methods=['DELETE'])
|
||||||
@authz_restrict_to_syscom
|
@requires_admin_creds
|
||||||
def remove_member_from_group(group_name, username):
|
@requires_authentication_no_realm
|
||||||
|
def remove_member_from_group(auth_username: str, group_name: str, username: str):
|
||||||
|
# Admin creds are required because slapd does not support access control
|
||||||
|
# rules which use the client's attributes
|
||||||
|
if not may_add_user_to_group(auth_username, group_name):
|
||||||
|
return {'error': "not authorized to add user to group"}, 403
|
||||||
unsubscribe_from_lists = is_truthy(
|
unsubscribe_from_lists = is_truthy(
|
||||||
request.args.get('unsubscribe_from_lists', 'true')
|
request.args.get('unsubscribe_from_lists', 'true')
|
||||||
)
|
)
|
||||||
|
|
|
@ -136,12 +136,13 @@ def test_members_renew(cli_setup, ldap_user, g_admin_ctx):
|
||||||
assert result.output == expected
|
assert result.output == expected
|
||||||
|
|
||||||
|
|
||||||
def test_members_pwreset(cli_setup, ldap_user, krb_user):
|
def test_members_pwreset(cli_setup, ldap_and_krb_user):
|
||||||
|
uid = ldap_and_krb_user.uid
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
result = runner.invoke(
|
result = runner.invoke(
|
||||||
cli, ['members', 'pwreset', ldap_user.uid], input='y\n')
|
cli, ['members', 'pwreset', uid], input='y\n')
|
||||||
expected_pat = re.compile((
|
expected_pat = re.compile((
|
||||||
f"^Are you sure you want to reset {ldap_user.uid}'s password\\? \\[y/N\\]: y\n"
|
f"^Are you sure you want to reset {uid}'s password\\? \\[y/N\\]: y\n"
|
||||||
"New password: \\S+\n$"
|
"New password: \\S+\n$"
|
||||||
), re.MULTILINE)
|
), re.MULTILINE)
|
||||||
assert result.exit_code == 0
|
assert result.exit_code == 0
|
||||||
|
|
|
@ -5,8 +5,8 @@ from mysql.connector import connect
|
||||||
from mysql.connector.errors import ProgrammingError
|
from mysql.connector.errors import ProgrammingError
|
||||||
|
|
||||||
|
|
||||||
def test_api_create_mysql_db(cfg, client, g_admin_ctx, ldap_user, krb_user):
|
def test_api_create_mysql_db(cfg, client, g_admin_ctx, ldap_and_krb_user):
|
||||||
uid = ldap_user.uid
|
uid = ldap_and_krb_user.uid
|
||||||
|
|
||||||
with g_admin_ctx():
|
with g_admin_ctx():
|
||||||
user = User(uid='someone_else', cn='Some Name', given_name='Some',
|
user = User(uid='someone_else', cn='Some Name', given_name='Some',
|
||||||
|
@ -72,8 +72,8 @@ def test_api_create_mysql_db(cfg, client, g_admin_ctx, ldap_user, krb_user):
|
||||||
user.remove_from_ldap()
|
user.remove_from_ldap()
|
||||||
|
|
||||||
|
|
||||||
def test_api_passwd_reset_mysql(cfg, client, g_admin_ctx, ldap_user, krb_user):
|
def test_api_passwd_reset_mysql(cfg, client, g_admin_ctx, ldap_and_krb_user):
|
||||||
uid = ldap_user.uid
|
uid = ldap_and_krb_user.uid
|
||||||
|
|
||||||
with g_admin_ctx():
|
with g_admin_ctx():
|
||||||
user = User(uid='someone_else', cn='Some Name', given_name='Some',
|
user = User(uid='someone_else', cn='Some Name', given_name='Some',
|
||||||
|
|
|
@ -4,8 +4,8 @@ from ceod.model import User
|
||||||
from psycopg2 import connect, OperationalError, ProgrammingError
|
from psycopg2 import connect, OperationalError, ProgrammingError
|
||||||
|
|
||||||
|
|
||||||
def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_user, krb_user):
|
def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_and_krb_user):
|
||||||
uid = ldap_user.uid
|
uid = ldap_and_krb_user.uid
|
||||||
|
|
||||||
with g_admin_ctx():
|
with g_admin_ctx():
|
||||||
user = User(uid='someone_else', cn='Some Name', given_name='Some',
|
user = User(uid='someone_else', cn='Some Name', given_name='Some',
|
||||||
|
@ -74,8 +74,8 @@ def test_api_create_psql_db(cfg, client, g_admin_ctx, ldap_user, krb_user):
|
||||||
user.remove_from_ldap()
|
user.remove_from_ldap()
|
||||||
|
|
||||||
|
|
||||||
def test_api_passwd_reset_psql(cfg, client, g_admin_ctx, ldap_user, krb_user):
|
def test_api_passwd_reset_psql(cfg, client, g_admin_ctx, ldap_and_krb_user):
|
||||||
uid = ldap_user.uid
|
uid = ldap_and_krb_user.uid
|
||||||
|
|
||||||
with g_admin_ctx():
|
with g_admin_ctx():
|
||||||
user = User(uid='someone_else', cn='Some Name', given_name='Some',
|
user = User(uid='someone_else', cn='Some Name', given_name='Some',
|
||||||
|
|
|
@ -2,6 +2,7 @@ import ldap3
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from ceod.model import Group
|
from ceod.model import Group
|
||||||
|
from tests.conftest_ceod_api import expect_successful_txn
|
||||||
|
|
||||||
|
|
||||||
def test_api_group_not_found(client):
|
def test_api_group_not_found(client):
|
||||||
|
@ -9,20 +10,28 @@ def test_api_group_not_found(client):
|
||||||
assert status == 404
|
assert status == 404
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='module')
|
def create_group(client, cn: str, description: str):
|
||||||
def create_group_resp(client):
|
|
||||||
status, data = client.post('/api/groups', json={
|
status, data = client.post('/api/groups', json={
|
||||||
'cn': 'test_group1',
|
'cn': cn,
|
||||||
'description': 'Test Group One',
|
'description': description,
|
||||||
})
|
})
|
||||||
assert status == 200
|
assert status == 200
|
||||||
assert data[-1]['status'] == 'completed'
|
assert data[-1]['status'] == 'completed'
|
||||||
yield status, data
|
return status, data
|
||||||
status, data = client.delete('/api/groups/test_group1')
|
|
||||||
|
|
||||||
|
def delete_group(client, cn: str):
|
||||||
|
status, data = client.delete(f'/api/groups/{cn}')
|
||||||
assert status == 200
|
assert status == 200
|
||||||
assert data[-1]['status'] == 'completed'
|
assert data[-1]['status'] == 'completed'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='module')
|
||||||
|
def create_group_resp(client):
|
||||||
|
yield create_group(client, 'test_group1', 'Test Group One')
|
||||||
|
delete_group(client, 'test_group1')
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='module')
|
@pytest.fixture(scope='module')
|
||||||
def create_group_result(create_group_resp):
|
def create_group_result(create_group_resp):
|
||||||
# convenience method
|
# convenience method
|
||||||
|
@ -126,12 +135,7 @@ def create_random_names():
|
||||||
def create_searchable_groups(client, create_random_names):
|
def create_searchable_groups(client, create_random_names):
|
||||||
random_names = create_random_names
|
random_names = create_random_names
|
||||||
for name in random_names:
|
for name in random_names:
|
||||||
status, data = client.post('/api/groups', json={
|
create_group(client, name, 'Groups with distinct names for testing searching')
|
||||||
'cn': name,
|
|
||||||
'description': 'Groups with distinct names for testing searching',
|
|
||||||
})
|
|
||||||
assert status == 200
|
|
||||||
assert data[-1]['status'] == 'completed'
|
|
||||||
yield random_names
|
yield random_names
|
||||||
|
|
||||||
|
|
||||||
|
@ -210,3 +214,27 @@ def test_api_group_auxiliary(cfg, client, ldap_user, g_admin_ctx):
|
||||||
for group in groups:
|
for group in groups:
|
||||||
if group.cn != 'syscom':
|
if group.cn != 'syscom':
|
||||||
group.remove_from_ldap()
|
group.remove_from_ldap()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('desc,expect_success', [
|
||||||
|
('in_syscom', True),
|
||||||
|
('is_offsck', True),
|
||||||
|
(None, False),
|
||||||
|
])
|
||||||
|
def test_api_add_or_remove_member_from_office(desc, expect_success, cfg, client, ldap_and_krb_user, new_user, g_admin_ctx):
|
||||||
|
uid = ldap_and_krb_user.uid
|
||||||
|
if desc == 'in_syscom':
|
||||||
|
uid = 'ctdalek'
|
||||||
|
elif desc == 'is_offsck':
|
||||||
|
with g_admin_ctx():
|
||||||
|
ldap_and_krb_user.set_positions(['offsck'])
|
||||||
|
|
||||||
|
def handle_resp(status_and_data):
|
||||||
|
if expect_success:
|
||||||
|
expect_successful_txn(status_and_data)
|
||||||
|
else:
|
||||||
|
status = status_and_data[0]
|
||||||
|
assert status == 403
|
||||||
|
|
||||||
|
handle_resp(client.post(f'/api/groups/office/members/{new_user.uid}', principal=uid))
|
||||||
|
handle_resp(client.delete(f'/api/groups/office/members/{new_user.uid}', principal=uid))
|
||||||
|
|
|
@ -538,6 +538,12 @@ def krb_user(simple_user):
|
||||||
simple_user.remove_from_kerberos()
|
simple_user.remove_from_kerberos()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def ldap_and_krb_user(ldap_user, krb_user):
|
||||||
|
# Note that ldap_user and krb_user are both the same person (simple_user)
|
||||||
|
yield ldap_user
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def new_user_gen(
|
def new_user_gen(
|
||||||
client, g_admin_ctx, ldap_srv_session, mocks_for_create_user, # noqa: F811
|
client, g_admin_ctx, ldap_srv_session, mocks_for_create_user, # noqa: F811
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import json
|
import json
|
||||||
import socket
|
import socket
|
||||||
|
from typing import Any, Dict, List, Tuple
|
||||||
|
|
||||||
import flask
|
import flask
|
||||||
from flask.testing import FlaskClient
|
from flask.testing import FlaskClient
|
||||||
|
@ -76,3 +77,9 @@ class CeodTestClient:
|
||||||
|
|
||||||
def put(self, path, principal=None, need_auth=True, delegate=True, **kwargs):
|
def put(self, path, principal=None, need_auth=True, delegate=True, **kwargs):
|
||||||
return self.request('PUT', path, principal, need_auth, delegate, **kwargs)
|
return self.request('PUT', path, principal, need_auth, delegate, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def expect_successful_txn(status_and_data: Tuple[int, List[Dict[str, Any]]]) -> None:
|
||||||
|
status, data = status_and_data
|
||||||
|
assert status == 200
|
||||||
|
assert data[-1]['status'] == 'completed'
|
||||||
|
|
Loading…
Reference in New Issue