pyceo/ceod/model/VHostManager.py

98 lines
3.6 KiB
Python

import glob
import os
import re
import subprocess
from typing import List, Dict
import jinja2
from zope.interface import implementer
from ceo_common.logger_factory import logger_factory
from ceo_common.interfaces import IVHostManager
PROXY_PASS_IP_RE = re.compile(r'^\s+proxy_pass\s+http://(?P<ip_address>[\d.]+);$')
VHOST_FILENAME_RE = re.compile(r'^member_(?P<username>[0-9a-z-]+)_(?P<domain>[0-9a-z.-]+)$')
logger = logger_factory(__name__)
@implementer(IVHostManager)
class VHostManager:
def __init__(
self,
vhost_dir: str,
ssl_cert_path: str,
ssl_key_path: str,
):
self.vhost_dir = vhost_dir
self.ssl_cert_path = ssl_cert_path
self.ssl_key_path = ssl_key_path
self.jinja_env = jinja2.Environment(
loader=jinja2.PackageLoader('ceod.model'),
)
@staticmethod
def _vhost_filename(username: str, domain: str) -> str:
"""Generate a filename for the vhost record"""
# sanity check...
assert '..' not in domain and '/' not in domain
return 'member' + '_' + username + '_' + domain
def _vhost_filepath(self, username: str, domain: str) -> str:
"""Generate an absolute path for the vhost record"""
return os.path.join(self.vhost_dir, self._vhost_filename(username, domain))
def _vhost_files(self, username: str) -> List[str]:
"""Return a list of all vhost files for this user."""
return glob.glob(os.path.join(self.vhost_dir, 'member_' + username + '_*'))
def _reload_web_server(self):
logger.debug('Reloading nginx')
subprocess.run(['systemctl', 'reload', 'nginx'], check=True)
def create_vhost(self, username: str, domain: str, ip_address: str):
template = self.jinja_env.get_template('nginx_cloud_vhost_config.j2')
body = template.render(
username=username, domain=domain, ip_address=ip_address,
ssl_cert_path=self.ssl_cert_path, ssl_key_path=self.ssl_key_path)
filepath = self._vhost_filepath(username, domain)
logger.info(f'Writing a new vhost ({domain} -> {ip_address}) to {filepath}')
with open(filepath, 'w') as fo:
fo.write(body)
self._reload_web_server()
def delete_vhost(self, username: str, domain: str):
filepath = self._vhost_filepath(username, domain)
logger.info(f'Deleting {filepath}')
os.unlink(filepath)
self._reload_web_server()
def get_num_vhosts(self, username: str) -> int:
return len(self._vhost_files(username))
def get_vhosts(self, username: str) -> List[Dict]:
vhosts = []
for filepath in self._vhost_files(username):
filename = os.path.basename(filepath)
match = VHOST_FILENAME_RE.match(filename)
assert match is not None, f"'{filename}' does not match expected pattern"
domain = match.group('domain')
ip_address = None
for line in open(filepath):
match = PROXY_PASS_IP_RE.match(line)
if match is None:
continue
ip_address = match.group('ip_address')
break
assert ip_address is not None, f"Could not find IP address in {filename}"
vhosts.append({'domain': domain, 'ip_address': ip_address})
return vhosts
def delete_all_vhosts_for_user(self, username: str):
filepaths = self._vhost_files(username)
if not filepaths:
return
for filepath in filepaths:
logger.info(f'Deleting {filepath}')
os.unlink(filepath)
self._reload_web_server()