forked from public/pyceo
1
0
Fork 0

Compare commits

...

4 Commits

Author SHA1 Message Date
Eric Li a5ea1627f3 Add custom domain name test and change delete_vhost test to delete all vhosts 2023-10-23 01:12:58 +01:00
Eric Li 78c1c9ed92 Make existing code work with current version of dnspython 2023-10-23 00:50:41 +01:00
Nathan Chung 1505b07600
87: add tests for custom vhosts
Signed-off-by: n4chung <n4chung@csclub.uwaterloo.ca>
2023-01-06 21:25:59 -05:00
Nathan Chung 3331e1e3ef
add fully automated vhost registrations (\#87)
Signed-off-by: n4chung <n4chung@csclub.uwaterloo.ca>
2023-01-06 16:36:17 -05:00
3 changed files with 72 additions and 5 deletions

View File

@ -4,9 +4,12 @@ import os
import re
import shutil
import subprocess
from hashlib import md5
from typing import List, Dict, Tuple
import dns.resolver
import jinja2
import tldextract
from zope import component
from zope.interface import implementer
@ -99,8 +102,33 @@ class VHostManager:
prefix = domain[:len(domain) - len(self.k8s_vhost_domain) - 1]
elif domain.endswith('.' + self.vhost_domain):
prefix = domain[:len(domain) - len(self.vhost_domain) - 1]
else:
return False
else: # user is requesting for a custom domain...
# generate checksum based on username
checksum = md5(username.encode('utf-8')).hexdigest()
record = f"csc-verification={checksum}"
# not implemented: check domain name exists?
# look up for TXT record `csc-verification` at root
rootname = tldextract.extract(domain).registered_domain
try:
records = dns.resolver.resolve(rootname, "TXT").rrset
for x in records:
if record in x.to_text():
print(f"found correct`csc-verification` record at {rootname}")
return True
except dns.resolver.NoAnswer as e:
if "The DNS response does not contain an answer to the question: . IN TXT" not in repr(e):
raise
# TODO: handle errors separately, return errors to user
print(f"{rootname} does not contain any TXT records.")
return False
print(f"cannot find a `csc-verification` record at {rootname}")
return False # deny by default
last_part = prefix.split('.')[-1]
if last_part == username:

View File

@ -1,5 +1,6 @@
click==8.1.3
cryptography==35.0.0
dnspython==2.4.2
Flask==2.1.2
gssapi==1.6.14
gunicorn==20.1.0
@ -10,6 +11,7 @@ psycopg2==2.9.1
python-augeas==1.1.0
requests==2.26.0
requests-gssapi==1.2.3
tldextract==5.0.1
urwid==2.1.2
Werkzeug==2.1.2
zope.component==5.0.1

View File

@ -1,8 +1,11 @@
import datetime
import os
import types
from unittest.mock import patch
import ldap3
import dns.resolver
from hashlib import md5
from ceo_common.model import Term
import ceo_common.utils as ceo_common_utils
@ -124,6 +127,22 @@ def test_cloud_vhosts(cfg, client, new_user, ldap_conn):
principal=uid)
assert status == 400
# invalid custom domain name (with TXT record)
custom_domain = "uwaterloo.ca"
ip2 = ip1
status, _ = client.put(
f'/api/cloud/vhosts/{custom_domain}', json={'ip_address': ip2},
principal=uid)
assert status == 400
# invalid custom domain name (without TXT record)
custom_domain2 = "google.com"
ip2 = ip1
status, _ = client.put(
f'/api/cloud/vhosts/{custom_domain2}', json={'ip_address': ip2},
principal=uid)
assert status == 400
# invalid IP address
domain3 = domain1
ip3 = '129.97.134.10'
@ -155,9 +174,27 @@ def test_cloud_vhosts(cfg, client, new_user, ldap_conn):
else:
assert status != 200
# delete a vhost
status, _ = client.delete(f'/api/cloud/vhosts/{domain1}', principal=uid)
assert status == 200
# delete all vhosts
for i in range(max_vhosts - 1):
domain = 'app' + str(i + 1) + '.' + uid + '.' + members_domain
status, _ = client.delete(f'/api/cloud/vhosts/{domain}', principal=uid)
assert status == 200
# TODO: valid custom domain name
now_mock.return_value += datetime.timedelta(seconds=rate_limit_secs)
custom_domain3 = "example.com"
ip2 = '172.19.134.32'
with patch.object(dns.resolver, 'resolve') as dns_mock:
dns_mock.return_value = types.SimpleNamespace()
dns_mock.return_value.rrset = [types.SimpleNamespace()]
dns_mock.return_value.rrset[0].to_text = lambda: f"csc-verification={md5(uid.encode('utf-8')).hexdigest()}"
status, _ = client.put(
f'/api/cloud/vhosts/{custom_domain3}', json={'ip_address': ip2},
principal=uid)
print(status)
assert status == 200
# TODO: invalid custom domain name (valid TXT record, invalid A/AAAA record)
# expired members may not create vhosts
expire_member(new_user, ldap_conn)