don't expire syscom members
continuous-integration/drone/pr Build is passing
Details
continuous-integration/drone/pr Build is passing
Details
This commit is contained in:
parent
1e94132e97
commit
6616d01935
|
@ -236,8 +236,9 @@ class LDAPService:
|
||||||
raise GroupAlreadyExistsError()
|
raise GroupAlreadyExistsError()
|
||||||
|
|
||||||
def get_expiring_users(self) -> List[IUser]:
|
def get_expiring_users(self) -> List[IUser]:
|
||||||
clauses = []
|
syscom_members = self.get_group('syscom').members
|
||||||
|
|
||||||
|
clauses = []
|
||||||
term = Term.current()
|
term = Term.current()
|
||||||
clauses.append(f'term={term}')
|
clauses.append(f'term={term}')
|
||||||
clauses.append(f'nonMemberTerm={term}')
|
clauses.append(f'nonMemberTerm={term}')
|
||||||
|
@ -258,7 +259,11 @@ class LDAPService:
|
||||||
query,
|
query,
|
||||||
attributes=ldap3.ALL_ATTRIBUTES,
|
attributes=ldap3.ALL_ATTRIBUTES,
|
||||||
search_scope=ldap3.LEVEL)
|
search_scope=ldap3.LEVEL)
|
||||||
return [User.deserialize_from_ldap(entry) for entry in conn.entries]
|
return [
|
||||||
|
User.deserialize_from_ldap(entry)
|
||||||
|
for entry in conn.entries
|
||||||
|
if entry.uid.value not in syscom_members
|
||||||
|
]
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def entry_ctx_for_group(self, group: IGroup):
|
def entry_ctx_for_group(self, group: IGroup):
|
||||||
|
|
|
@ -147,7 +147,7 @@ def test_members_pwreset(cli_setup, ldap_user, krb_user):
|
||||||
assert expected_pat.match(result.output) is not None
|
assert expected_pat.match(result.output) is not None
|
||||||
|
|
||||||
|
|
||||||
def test_members_expire(cli_setup, ldap_user):
|
def test_members_expire(cli_setup, ldap_user, syscom_group):
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
|
|
||||||
with patch.object(ceo_common.utils, 'get_current_datetime') as datetime_mock:
|
with patch.object(ceo_common.utils, 'get_current_datetime') as datetime_mock:
|
||||||
|
|
|
@ -238,8 +238,26 @@ def test_authz_check(client, create_user_result):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('term_attr', ['terms', 'non_member_terms'])
|
@pytest.mark.parametrize('term_attr', ['terms', 'non_member_terms'])
|
||||||
def test_expire(client, new_user_gen, term_attr):
|
def test_expire(client, new_user, term_attr, syscom_group, ldap_conn):
|
||||||
start_of_current_term = Term.current().to_datetime()
|
assert new_user.shadowExpire is None
|
||||||
|
current_term = Term.current()
|
||||||
|
start_of_current_term = current_term.to_datetime()
|
||||||
|
|
||||||
|
def reset_terms():
|
||||||
|
if term_attr == 'terms':
|
||||||
|
attr = 'term'
|
||||||
|
else:
|
||||||
|
attr = 'nonMemberTerm'
|
||||||
|
changes = {
|
||||||
|
attr: [(ldap3.MODIFY_REPLACE, [str(current_term)])]
|
||||||
|
}
|
||||||
|
dn = new_user.ldap_srv.uid_to_dn(new_user.uid)
|
||||||
|
ldap_conn.modify(dn, changes)
|
||||||
|
if term_attr == 'terms':
|
||||||
|
new_user.terms = [str(current_term)]
|
||||||
|
else:
|
||||||
|
new_user.non_member_terms = [str(current_term)]
|
||||||
|
|
||||||
# test_date, should_expire
|
# test_date, should_expire
|
||||||
test_cases = [
|
test_cases = [
|
||||||
# same term, membership is still valid
|
# same term, membership is still valid
|
||||||
|
@ -251,16 +269,13 @@ def test_expire(client, new_user_gen, term_attr):
|
||||||
# next next term, membership is definitely invalid
|
# next next term, membership is definitely invalid
|
||||||
(start_of_current_term + datetime.timedelta(days=250), True),
|
(start_of_current_term + datetime.timedelta(days=250), True),
|
||||||
]
|
]
|
||||||
|
uid = new_user.uid
|
||||||
|
|
||||||
for test_date, should_expire in test_cases:
|
for test_date, should_expire in test_cases:
|
||||||
with new_user_gen() as user_obj, \
|
with patch.object(ceo_common.utils, 'get_current_datetime') as datetime_mock:
|
||||||
patch.object(ceo_common.utils, 'get_current_datetime') as datetime_mock:
|
user = new_user.to_dict()
|
||||||
user = user_obj.to_dict()
|
|
||||||
uid = user['uid']
|
|
||||||
datetime_mock.return_value = test_date
|
datetime_mock.return_value = test_date
|
||||||
|
|
||||||
assert user['shadowExpire'] is None
|
|
||||||
|
|
||||||
status, data = client.post('/api/members/expire?dry_run=yes')
|
status, data = client.post('/api/members/expire?dry_run=yes')
|
||||||
assert status == 200
|
assert status == 200
|
||||||
assert (data == [uid]) == should_expire
|
assert (data == [uid]) == should_expire
|
||||||
|
@ -284,3 +299,25 @@ def test_expire(client, new_user_gen, term_attr):
|
||||||
|
|
||||||
_, user = client.get(f'/api/members/{uid}')
|
_, user = client.get(f'/api/members/{uid}')
|
||||||
assert user['shadowExpire'] is None
|
assert user['shadowExpire'] is None
|
||||||
|
reset_terms()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('in_syscom', [True, False])
|
||||||
|
def test_expire_syscom_member(client, new_user, syscom_group, g_admin_ctx, ldap_conn, in_syscom):
|
||||||
|
uid = new_user.uid
|
||||||
|
start_of_current_term = Term.current().to_datetime()
|
||||||
|
if in_syscom:
|
||||||
|
group_dn = new_user.ldap_srv.group_cn_to_dn('syscom')
|
||||||
|
user_dn = new_user.ldap_srv.uid_to_dn(uid)
|
||||||
|
changes = {
|
||||||
|
'uniqueMember': [(ldap3.MODIFY_ADD, [user_dn])]
|
||||||
|
}
|
||||||
|
ldap_conn.modify(group_dn, changes)
|
||||||
|
with patch.object(ceo_common.utils, 'get_current_datetime') as datetime_mock:
|
||||||
|
datetime_mock.return_value = start_of_current_term + datetime.timedelta(days=160)
|
||||||
|
status, data = client.post('/api/members/expire')
|
||||||
|
assert status == 200
|
||||||
|
if in_syscom:
|
||||||
|
assert data == []
|
||||||
|
else:
|
||||||
|
assert data == [uid]
|
||||||
|
|
|
@ -458,6 +458,20 @@ def ldap_group(simple_group, g_admin_ctx):
|
||||||
simple_group.remove_from_ldap()
|
simple_group.remove_from_ldap()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def syscom_group(g_admin_ctx):
|
||||||
|
group = Group(
|
||||||
|
cn='syscom',
|
||||||
|
gid_number=10001,
|
||||||
|
user_cn='Systems Committee'
|
||||||
|
)
|
||||||
|
with g_admin_ctx():
|
||||||
|
group.add_to_ldap()
|
||||||
|
yield group
|
||||||
|
with g_admin_ctx():
|
||||||
|
group.remove_from_ldap()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def uwldap_user(cfg, uwldap_srv, ldap_conn):
|
def uwldap_user(cfg, uwldap_srv, ldap_conn):
|
||||||
conn = ldap_conn
|
conn = ldap_conn
|
||||||
|
|
Loading…
Reference in New Issue