pyceo/ceod/model/utils.py

101 lines
3.3 KiB
Python
Raw Normal View History

import functools
import json
import os
from typing import Dict, List, Union, Callable
from ceo_common.errors import RateLimitError
from ceo_common.model import Term
import ceo_common.utils
2021-07-19 01:47:39 -04:00
def bytes_to_strings(data: Dict[str, List[bytes]]) -> Dict[str, List[str]]:
"""Convert the attribute values from bytes to strings"""
return {
key: [b.decode() for b in val]
for key, val in data.items()
}
def strings_to_bytes(data: Dict[str, List[str]]) -> Dict[str, List[bytes]]:
"""Convert the attribute values from strings to bytes"""
return {
key: [b.encode() for b in val]
for key, val in data.items()
}
def dn_to_uid(dn: str) -> str:
"""Extract the UID from an LDAP DN.
Examples:
dn_to_uid('uid=ctdalek,ou=People,dc=csclub,dc=uwaterloo,dc=ca')
-> 'ctdalek'
"""
return dn.split(',', 1)[0].split('=')[1]
def should_be_club_rep(terms: Union[None, List[str]],
non_member_terms: Union[None, List[str]]) -> bool:
"""Returns True iff a user's most recent term was a non-member term."""
if not non_member_terms:
# no non-member terms => was only ever a member
return False
if not terms:
# no member terms => was only ever a club rep
return True
# decide using the most recent term (member or non-member)
return max(map(Term, non_member_terms)) > max(map(Term, terms))
def rate_limit(api_name: str, limit_secs: int):
"""
Returns a function which returns a decorator to rate limit
an API call. Since the rate limit is per-user, the first
argument to the function being rate limited must be a username.
"""
state_dir = '/run/ceod'
if not os.path.isdir(state_dir):
os.mkdir(state_dir)
rate_limit_file = os.path.join(state_dir, 'rate_limit.json')
def _check_rate_limit(username: str):
if not os.path.exists(rate_limit_file):
return
rate_limits_by_api = json.load(open(rate_limit_file))
if api_name not in rate_limits_by_api:
return
d = rate_limits_by_api[api_name]
if username not in d:
return
now = int(ceo_common.utils.get_current_datetime().timestamp())
time_passed = now - d[username]
if time_passed < limit_secs:
time_remaining = limit_secs - time_passed
raise RateLimitError(f'Please wait {time_remaining} seconds')
def _update_rate_limit_timestamp(username: str):
if os.path.exists(rate_limit_file):
rate_limits_by_api = json.load(open(rate_limit_file))
else:
rate_limits_by_api = {}
if api_name in rate_limits_by_api:
d = rate_limits_by_api[api_name]
else:
d = {}
rate_limits_by_api[api_name] = d
now = int(ceo_common.utils.get_current_datetime().timestamp())
d[username] = now
json.dump(rate_limits_by_api, open(rate_limit_file, 'w'))
def decorator_gen(func: Callable):
@functools.wraps(func)
def decorator(username: str, *args, **kwargs):
# sanity check
assert isinstance(username, str)
_check_rate_limit(username)
func(username, *args, **kwargs)
_update_rate_limit_timestamp(username)
return decorator
return decorator_gen