re-organize test fixture scopes
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Max Erenberg 2021-12-11 16:19:48 -05:00
parent e76731fb4b
commit cb3516cc9f
6 changed files with 141 additions and 95 deletions

View File

@ -16,17 +16,19 @@ class Term:
def __repr__(self):
return self.s_term
@staticmethod
def from_datetime(dt: datetime.datetime):
"""Get a Term object for the given date."""
idx = (dt.month - 1) // 4
c = Term.seasons[idx]
s_term = c + str(dt.year)
return Term(s_term)
@staticmethod
def current():
"""Get a Term object for the current date."""
dt = utils.get_current_datetime()
c = 'w'
if 5 <= dt.month <= 8:
c = 's'
elif 9 <= dt.month:
c = 'f'
s_term = c + str(dt.year)
return Term(s_term)
return Term.from_datetime(dt)
def start_month(self):
return self.seasons.index(self.s_term[0]) * 4 + 1

View File

@ -117,9 +117,7 @@ def delete_user(username: str):
@bp.route('/expire', methods=['POST'])
@authz_restrict_to_syscom
def expire_users():
dry_run = False
if is_truthy(request.args.get('dry_run', 'false')):
dry_run = True
dry_run = is_truthy(request.args.get('dry_run', 'false'))
ldap_srv = component.getUtility(ILDAPService)
members = ldap_srv.get_expiring_users()

View File

@ -1,7 +1,6 @@
import contextlib
import grp
import pwd
import datetime
from typing import Union, Dict, List
from flask import g
@ -15,6 +14,7 @@ from ceo_common.errors import UserNotFoundError, GroupNotFoundError, \
from ceo_common.interfaces import ILDAPService, IConfig, \
IUser, IGroup, IUWLDAPService
from ceo_common.model import Term
import ceo_common.utils as ceo_common_utils
from .User import User
from .Group import Group
@ -253,13 +253,13 @@ class LDAPService:
query.append(f'nonMemberTerm={term}')
# Include last term too if the new term just started
dt = datetime.datetime.now()
dt = ceo_common_utils.get_current_datetime()
if dt.month == term.start_month():
last_term = term - 1
query.append(f'term={last_term}')
query.append(f'nonMemberTerm={last_term}')
query = '(!(|(shadowExpire=*)(' + ')('.join(query) + ')))'
query = '(!(|(shadowExpire=1)(' + ')('.join(query) + ')))'
conn = self._get_ldap_conn()
conn.search(

View File

@ -2,7 +2,7 @@ from unittest.mock import patch
import ldap3
import pytest
from datetime import datetime
import datetime
import ceod.utils
import ceo_common.utils
@ -15,21 +15,21 @@ def test_api_user_not_found(client):
@pytest.fixture(scope='module')
def create_user_resp(client, mocks_for_create_user, mock_mail_server):
def create_user_resp(client, mocks_for_create_user_module, mock_mail_server):
mock_mail_server.messages.clear()
status, data = client.post('/api/members', json={
'uid': 'test_1',
'uid': 'test1',
'cn': 'Test One',
'given_name': 'Test',
'sn': 'One',
'program': 'Math',
'terms': ['s2021'],
'forwarding_addresses': ['test_1@uwaterloo.internal'],
'forwarding_addresses': ['test1@uwaterloo.internal'],
})
assert status == 200
assert data[-1]['status'] == 'completed'
yield status, data
status, data = client.delete('/api/members/test_1')
status, data = client.delete('/api/members/test1')
assert status == 200
assert data[-1]['status'] == 'completed'
@ -57,17 +57,17 @@ def test_api_create_user(cfg, create_user_resp, mock_mail_server):
"cn": "Test One",
"given_name": "Test",
"sn": "One",
"uid": "test_1",
"uid": "test1",
"uid_number": min_uid,
"gid_number": min_uid,
"login_shell": "/bin/bash",
"home_directory": "/tmp/test_users/test_1",
"home_directory": "/tmp/test_users/test1",
"is_club": False,
"is_club_rep": False,
"program": "Math",
"terms": ["s2021"],
"mail_local_addresses": ["test_1@csclub.internal"],
"forwarding_addresses": ['test_1@uwaterloo.internal'],
"mail_local_addresses": ["test1@csclub.internal"],
"forwarding_addresses": ['test1@uwaterloo.internal'],
"password": "krb5",
"shadowExpire": None,
}},
@ -76,7 +76,7 @@ def test_api_create_user(cfg, create_user_resp, mock_mail_server):
# Two messages should have been sent: a welcome message to the new member,
# and an announcement to the ceo mailing list
assert len(mock_mail_server.messages) == 2
assert mock_mail_server.messages[0]['to'] == 'test_1@csclub.internal'
assert mock_mail_server.messages[0]['to'] == 'test1@csclub.internal'
assert mock_mail_server.messages[1]['to'] == 'ceo@csclub.internal,ctdalek@csclub.internal'
mock_mail_server.messages.clear()
@ -216,7 +216,7 @@ def test_api_reset_password(client, create_user_result):
def test_authz_check(client, create_user_result):
# non-staff members may not create users
status, data = client.post('/api/members', json={
'uid': 'test_1', 'cn': 'Test One', 'given_name': 'Test',
'uid': 'test1', 'cn': 'Test One', 'given_name': 'Test',
'sn': 'One', 'terms': ['s2021'],
}, principal='regular1')
assert status == 403
@ -231,47 +231,56 @@ def test_authz_check(client, create_user_result):
# If we're syscom but we don't pass credentials, the request should fail
_, data = client.post('/api/members', json={
'uid': 'test_1', 'cn': 'Test One', 'given_name': 'Test',
'uid': 'test1', 'cn': 'Test One', 'given_name': 'Test',
'sn': 'One', 'terms': ['s2021'],
}, principal='ctdalek', delegate=False)
assert data[-1]['status'] == 'aborted'
@pytest.mark.parametrize('test_date, should_expire', [
('2021-05-15', False),
('2021-06-01', False),
('2021-09-15', False),
('1966-01-20', True),
('2021-10-15', True),
('2022-05-20', True),
('2050-04-01', True)])
@pytest.mark.parametrize('term_attr', ['terms', 'non_member_terms'])
def test_expire(client, new_user, term_attr, test_date, should_expire):
user = new_user.to_dict()
uid = user['uid']
term = repr(Term.current())
def test_expire(client, new_user_gen, term_attr):
start_of_current_term = Term.current().to_datetime()
# test_date, should_expire
test_cases = [
# same term, membership is still valid
(start_of_current_term + datetime.timedelta(days=90), False),
# first month of next term, grace period is activated
(start_of_current_term + datetime.timedelta(days=130), False),
# second month of next term, membership is now invalid
(start_of_current_term + datetime.timedelta(days=160), True),
# next next term, membership is definitely invalid
(start_of_current_term + datetime.timedelta(days=250), True),
]
with patch.object(ceo_common.utils, 'get_current_datetime') as datetime_mock:
datetime_mock.return_value = datetime.fromisoformat(test_date)
for test_date, should_expire in test_cases:
with new_user_gen() as user_obj, \
patch.object(ceo_common.utils, 'get_current_datetime') as datetime_mock:
user = user_obj.to_dict()
uid = user['uid']
datetime_mock.return_value = test_date
assert user['shadowExpire'] is None
assert user['shadowExpire'] is None
status, data = client.post('/api/members/expire?dry_run=yes')
assert status == 200
assert (data == [uid]) == should_expire
status, data = client.post('/api/members/expire?dry_run=yes')
assert status == 200
assert (data == [uid]) == should_expire
_, user = client.get(f'/api/members/{uid}')
assert user['shadowExpire'] is None
_, user = client.get(f'/api/members/{uid}')
assert user['shadowExpire'] is None
status, data = client.post('/api/members/expire')
assert status == 200
assert (data == [uid]) == should_expire
status, data = client.post('/api/members/expire')
assert status == 200
assert (data == [uid]) == should_expire
_, user = client.get(f'/api/members/{uid}')
assert (user['shadowExpire'] is not None) == should_expire
_, user = client.get(f'/api/members/{uid}')
assert (user['shadowExpire'] is not None) == should_expire
status, _ = client.post(f'/api/members/{uid}/renew', json={term_attr: [term]})
assert status == 200
if not should_expire:
continue
_, user = client.get(f'/api/members/{uid}')
assert user['shadowExpire'] is None
term = Term.from_datetime(test_date)
status, _ = client.post(f'/api/members/{uid}/renew', json={term_attr: [str(term)]})
assert status == 200
_, user = client.get(f'/api/members/{uid}')
assert user['shadowExpire'] is None

View File

@ -1,15 +1,13 @@
import contextlib
import grp
import importlib.resources
from multiprocessing import Process
import os
import pwd
import shutil
import subprocess
from subprocess import DEVNULL
import sys
import time
from unittest.mock import patch, Mock
from unittest.mock import Mock
import flask
import gssapi
@ -19,7 +17,12 @@ import requests
import socket
from zope import component
from .utils import gssapi_token_ctx, ccache_cleanup # noqa: F401
# noqa: F401
from .utils import ( # noqa: F401
gssapi_token_ctx,
ccache_cleanup,
mocks_for_create_user_ctx,
)
from ceo_common.interfaces import IConfig, IKerberosService, ILDAPService, \
IFileService, IMailmanService, IHTTPClient, IUWLDAPService, IMailService, \
IDatabaseService, ICloudService
@ -29,7 +32,6 @@ from ceod.db import MySQLService, PostgreSQLService
from ceod.model import KerberosService, LDAPService, FileService, User, \
MailmanService, Group, UWLDAPService, UWLDAPRecord, MailService, \
CloudService
import ceod.utils as utils
from .MockSMTPServer import MockSMTPServer
from .MockMailmanServer import MockMailmanServer
from .MockCloudStackServer import MockCloudStackServer
@ -301,17 +303,15 @@ def app(
return app
@pytest.fixture(scope='session')
@pytest.fixture
def mocks_for_create_user():
with patch.object(utils, 'gen_password') as gen_password_mock, \
patch.object(pwd, 'getpwuid') as getpwuid_mock, \
patch.object(grp, 'getgrgid') as getgrgid_mock:
gen_password_mock.return_value = 'krb5'
# Normally, if getpwuid or getgrgid do *not* raise a KeyError,
# then LDAPService will skip that UID. Therefore, by raising a
# KeyError, we are making sure that the UID will *not* be skipped.
getpwuid_mock.side_effect = KeyError()
getgrgid_mock.side_effect = KeyError()
with mocks_for_create_user_ctx():
yield
@pytest.fixture(scope='module')
def mocks_for_create_user_module():
with mocks_for_create_user_ctx():
yield
@ -356,32 +356,45 @@ def krb_user(simple_user):
simple_user.remove_from_kerberos()
_new_user_id_counter = 10001
@pytest.fixture # noqa: E302
def new_user(client, g_admin_ctx, ldap_srv_session): # noqa: F811
global _new_user_id_counter
uid = 'test' + str(_new_user_id_counter)
_new_user_id_counter += 1
status, data = client.post('/api/members', json={
'uid': uid,
'cn': 'John Doe',
'given_name': 'John',
'sn': 'Doe',
'program': 'Math',
'terms': [str(Term.current())],
})
assert status == 200
assert data[-1]['status'] == 'completed'
with g_admin_ctx():
user = ldap_srv_session.get_user(uid)
subprocess.run([
'kadmin', '-k', '-p', 'ceod/admin', 'cpw',
'-pw', 'krb5', uid,
], check=True)
yield user
status, data = client.delete(f'/api/members/{uid}')
assert status == 200
assert data[-1]['status'] == 'completed'
@pytest.fixture
def new_user_gen(
client, g_admin_ctx, ldap_srv_session, mocks_for_create_user, # noqa: F811
):
_new_user_id_counter = 11001
@contextlib.contextmanager
def wrapper():
nonlocal _new_user_id_counter
uid = 'test' + str(_new_user_id_counter)
_new_user_id_counter += 1
status, data = client.post('/api/members', json={
'uid': uid,
'cn': 'John Doe',
'given_name': 'John',
'sn': 'Doe',
'program': 'Math',
'terms': [str(Term.current())],
})
assert status == 200
assert data[-1]['status'] == 'completed'
subprocess.run([
'kadmin', '-k', '-p', 'ceod/admin',
'modprinc', '-needchange', uid,
], check=True)
with g_admin_ctx():
user = ldap_srv_session.get_user(uid)
yield user
status, data = client.delete(f'/api/members/{uid}')
assert status == 200
assert data[-1]['status'] == 'completed'
return wrapper
@pytest.fixture
def new_user(new_user_gen):
with new_user_gen() as user:
yield user
@pytest.fixture

View File

@ -1,8 +1,12 @@
import ceod.utils as ceod_utils
import contextlib
import os
import grp
import pwd
import subprocess
from subprocess import DEVNULL
import tempfile
from unittest.mock import patch
import gssapi
import pytest
@ -45,3 +49,23 @@ def ccache_cleanup():
"""Make sure the ccache files get deleted at the end of the tests."""
yield
_cache.clear()
@contextlib.contextmanager
def gen_password_mock_ctx():
with patch.object(ceod_utils, 'gen_password') as mock:
mock.return_value = 'krb5'
yield
@contextlib.contextmanager
def mocks_for_create_user_ctx():
with gen_password_mock_ctx(), \
patch.object(pwd, 'getpwuid') as getpwuid_mock, \
patch.object(grp, 'getgrgid') as getgrgid_mock:
# Normally, if getpwuid or getgrgid do *not* raise a KeyError,
# then LDAPService will skip that UID. Therefore, by raising a
# KeyError, we are making sure that the UID will *not* be skipped.
getpwuid_mock.side_effect = KeyError()
getgrgid_mock.side_effect = KeyError()
yield