Python CSC Electronic Office
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

97 lines
3.6 KiB

import glob
import os
import re
import subprocess
from typing import List, Dict
import jinja2
from zope.interface import implementer
from ceo_common.logger_factory import logger_factory
from ceo_common.interfaces import IVHostManager
PROXY_PASS_IP_RE = re.compile(r'^\s+proxy_pass\s+http://(?P<ip_address>[\d.]+);$')
VHOST_FILENAME_RE = re.compile(r'^member_(?P<username>[0-9a-z-]+)_(?P<domain>[0-9a-z.-]+)$')
logger = logger_factory(__name__)
class VHostManager:
def __init__(
vhost_dir: str,
ssl_cert_path: str,
ssl_key_path: str,
self.vhost_dir = vhost_dir
self.ssl_cert_path = ssl_cert_path
self.ssl_key_path = ssl_key_path
self.jinja_env = jinja2.Environment(
def _vhost_filename(username: str, domain: str) -> str:
"""Generate a filename for the vhost record"""
# sanity check...
assert '..' not in domain and '/' not in domain
return 'member' + '_' + username + '_' + domain
def _vhost_filepath(self, username: str, domain: str) -> str:
"""Generate an absolute path for the vhost record"""
return os.path.join(self.vhost_dir, self._vhost_filename(username, domain))
def _vhost_files(self, username: str) -> List[str]:
"""Return a list of all vhost files for this user."""
return glob.glob(os.path.join(self.vhost_dir, 'member_' + username + '_*'))
def _reload_web_server(self):
logger.debug('Reloading nginx')['systemctl', 'reload', 'nginx'], check=True)
def create_vhost(self, username: str, domain: str, ip_address: str):
template = self.jinja_env.get_template('nginx_cloud_vhost_config.j2')
body = template.render(
username=username, domain=domain, ip_address=ip_address,
ssl_cert_path=self.ssl_cert_path, ssl_key_path=self.ssl_key_path)
filepath = self._vhost_filepath(username, domain)'Writing a new vhost ({domain} -> {ip_address}) to {filepath}')
with open(filepath, 'w') as fo:
def delete_vhost(self, username: str, domain: str):
filepath = self._vhost_filepath(username, domain)'Deleting {filepath}')
def get_num_vhosts(self, username: str) -> int:
return len(self._vhost_files(username))
def get_vhosts(self, username: str) -> List[Dict]:
vhosts = []
for filepath in self._vhost_files(username):
filename = os.path.basename(filepath)
match = VHOST_FILENAME_RE.match(filename)
assert match is not None, f"'{filename}' does not match expected pattern"
domain ='domain')
ip_address = None
for line in open(filepath):
match = PROXY_PASS_IP_RE.match(line)
if match is None:
ip_address ='ip_address')
assert ip_address is not None, f"Could not find IP address in {filename}"
vhosts.append({'domain': domain, 'ip_address': ip_address})
return vhosts
def delete_all_vhosts_for_user(self, username: str):
filepaths = self._vhost_files(username)
if not filepaths:
for filepath in filepaths:'Deleting {filepath}')