forked from public/pyceo
Compare commits
4 Commits
b507c56136
...
a5ea1627f3
Author | SHA1 | Date |
---|---|---|
Eric Li | a5ea1627f3 | |
Eric Li | 78c1c9ed92 | |
Nathan Chung | 1505b07600 | |
Nathan Chung | 3331e1e3ef |
|
@ -4,9 +4,12 @@ import os
|
|||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
from hashlib import md5
|
||||
from typing import List, Dict, Tuple
|
||||
|
||||
import dns.resolver
|
||||
import jinja2
|
||||
import tldextract
|
||||
from zope import component
|
||||
from zope.interface import implementer
|
||||
|
||||
|
@ -99,8 +102,33 @@ class VHostManager:
|
|||
prefix = domain[:len(domain) - len(self.k8s_vhost_domain) - 1]
|
||||
elif domain.endswith('.' + self.vhost_domain):
|
||||
prefix = domain[:len(domain) - len(self.vhost_domain) - 1]
|
||||
else:
|
||||
return False
|
||||
else: # user is requesting for a custom domain...
|
||||
# generate checksum based on username
|
||||
checksum = md5(username.encode('utf-8')).hexdigest()
|
||||
record = f"csc-verification={checksum}"
|
||||
|
||||
# not implemented: check domain name exists?
|
||||
# look up for TXT record `csc-verification` at root
|
||||
rootname = tldextract.extract(domain).registered_domain
|
||||
try:
|
||||
records = dns.resolver.resolve(rootname, "TXT").rrset
|
||||
for x in records:
|
||||
if record in x.to_text():
|
||||
print(f"found correct`csc-verification` record at {rootname}")
|
||||
return True
|
||||
|
||||
except dns.resolver.NoAnswer as e:
|
||||
if "The DNS response does not contain an answer to the question: . IN TXT" not in repr(e):
|
||||
raise
|
||||
|
||||
# TODO: handle errors separately, return errors to user
|
||||
print(f"{rootname} does not contain any TXT records.")
|
||||
return False
|
||||
|
||||
print(f"cannot find a `csc-verification` record at {rootname}")
|
||||
|
||||
return False # deny by default
|
||||
|
||||
last_part = prefix.split('.')[-1]
|
||||
|
||||
if last_part == username:
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
click==8.1.3
|
||||
cryptography==35.0.0
|
||||
dnspython==2.4.2
|
||||
Flask==2.1.2
|
||||
gssapi==1.6.14
|
||||
gunicorn==20.1.0
|
||||
|
@ -10,6 +11,7 @@ psycopg2==2.9.1
|
|||
python-augeas==1.1.0
|
||||
requests==2.26.0
|
||||
requests-gssapi==1.2.3
|
||||
tldextract==5.0.1
|
||||
urwid==2.1.2
|
||||
Werkzeug==2.1.2
|
||||
zope.component==5.0.1
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
import datetime
|
||||
import os
|
||||
import types
|
||||
from unittest.mock import patch
|
||||
|
||||
import ldap3
|
||||
import dns.resolver
|
||||
from hashlib import md5
|
||||
|
||||
from ceo_common.model import Term
|
||||
import ceo_common.utils as ceo_common_utils
|
||||
|
@ -124,6 +127,22 @@ def test_cloud_vhosts(cfg, client, new_user, ldap_conn):
|
|||
principal=uid)
|
||||
assert status == 400
|
||||
|
||||
# invalid custom domain name (with TXT record)
|
||||
custom_domain = "uwaterloo.ca"
|
||||
ip2 = ip1
|
||||
status, _ = client.put(
|
||||
f'/api/cloud/vhosts/{custom_domain}', json={'ip_address': ip2},
|
||||
principal=uid)
|
||||
assert status == 400
|
||||
|
||||
# invalid custom domain name (without TXT record)
|
||||
custom_domain2 = "google.com"
|
||||
ip2 = ip1
|
||||
status, _ = client.put(
|
||||
f'/api/cloud/vhosts/{custom_domain2}', json={'ip_address': ip2},
|
||||
principal=uid)
|
||||
assert status == 400
|
||||
|
||||
# invalid IP address
|
||||
domain3 = domain1
|
||||
ip3 = '129.97.134.10'
|
||||
|
@ -155,9 +174,27 @@ def test_cloud_vhosts(cfg, client, new_user, ldap_conn):
|
|||
else:
|
||||
assert status != 200
|
||||
|
||||
# delete a vhost
|
||||
status, _ = client.delete(f'/api/cloud/vhosts/{domain1}', principal=uid)
|
||||
assert status == 200
|
||||
# delete all vhosts
|
||||
for i in range(max_vhosts - 1):
|
||||
domain = 'app' + str(i + 1) + '.' + uid + '.' + members_domain
|
||||
status, _ = client.delete(f'/api/cloud/vhosts/{domain}', principal=uid)
|
||||
assert status == 200
|
||||
|
||||
# TODO: valid custom domain name
|
||||
now_mock.return_value += datetime.timedelta(seconds=rate_limit_secs)
|
||||
custom_domain3 = "example.com"
|
||||
ip2 = '172.19.134.32'
|
||||
with patch.object(dns.resolver, 'resolve') as dns_mock:
|
||||
dns_mock.return_value = types.SimpleNamespace()
|
||||
dns_mock.return_value.rrset = [types.SimpleNamespace()]
|
||||
dns_mock.return_value.rrset[0].to_text = lambda: f"csc-verification={md5(uid.encode('utf-8')).hexdigest()}"
|
||||
status, _ = client.put(
|
||||
f'/api/cloud/vhosts/{custom_domain3}', json={'ip_address': ip2},
|
||||
principal=uid)
|
||||
print(status)
|
||||
assert status == 200
|
||||
|
||||
# TODO: invalid custom domain name (valid TXT record, invalid A/AAAA record)
|
||||
|
||||
# expired members may not create vhosts
|
||||
expire_member(new_user, ldap_conn)
|
||||
|
|
Loading…
Reference in New Issue