Python CSC Electronic Office
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
pyceo/ceod/model/VHostManager.py

145 lines
5.6 KiB

import glob
import os
import re
import shutil
import subprocess
from typing import List, Dict, Tuple
import jinja2
from zope import component
from zope.interface import implementer
from ceo_common.logger_factory import logger_factory
from ceo_common.interfaces import IVHostManager, IConfig
PROXY_PASS_IP_RE = re.compile(r'^\s+proxy_pass\s+http://(?P<ip_address>[\d.]+);$')
VHOST_FILENAME_RE = re.compile(r'^(?P<username>[0-9a-z-]+)_(?P<domain>[0-9a-z.-]+)$')
logger = logger_factory(__name__)
@implementer(IVHostManager)
class VHostManager:
def __init__(self):
cfg = component.getUtility(IConfig)
self.vhost_dir = cfg.get('cloud vhosts_vhost_dir')
self.ssl_dir = cfg.get('cloud vhosts_ssl_dir')
if not os.path.exists(self.vhost_dir):
os.makedirs(self.vhost_dir)
if not os.path.exists(self.ssl_dir):
os.makedirs(self.ssl_dir)
self.default_ssl_cert = cfg.get('cloud vhosts_default_ssl_cert')
self.default_ssl_key = cfg.get('cloud vhosts_default_ssl_key')
self.acme_challenge_dir = cfg.get('cloud vhosts_acme_challenge_dir')
self.acme_dir = '/root/.acme.sh'
self.acme_sh = os.path.join(self.acme_dir, 'acme.sh')
self.jinja_env = jinja2.Environment(
loader=jinja2.PackageLoader('ceod.model'),
)
@staticmethod
def _vhost_filename(username: str, domain: str) -> str:
"""Generate a filename for the vhost record"""
# sanity check...
assert '..' not in domain and '/' not in domain
return username + '_' + domain
def _vhost_filepath(self, username: str, domain: str) -> str:
"""Generate an absolute path for the vhost record"""
return os.path.join(self.vhost_dir, self._vhost_filename(username, domain))
def _vhost_files(self, username: str) -> List[str]:
"""Return a list of all vhost files for this user."""
return glob.glob(os.path.join(self.vhost_dir, username + '_*'))
def _run(self, args: List[str]):
subprocess.run(args, check=True)
def _reload_web_server(self):
logger.debug('Reloading NGINX')
self._run(['systemctl', 'reload', 'nginx'])
def _get_cert_and_key_path(self, domain: str) -> Tuple[str, str]:
cert_path = f'{self.ssl_dir}/{domain}.chain'
key_path = f'{self.ssl_dir}/{domain}.key'
return cert_path, key_path
def _acquire_new_cert(self, domain: str, cert_path: str, key_path: str):
logger.info(f'issuing new certificate for {domain}')
self._run([
self.acme_sh, '--issue', '-d', domain,
'-w', self.acme_challenge_dir,
])
logger.info(f'installing new certificate for {domain}')
self._run([
self.acme_sh, '--install-cert', '-d', domain,
'--key-file', key_path,
'--fullchain-file', cert_path,
'--reloadcmd', 'systemctl reload nginx',
])
def _delete_cert(self, domain: str, cert_path: str, key_path: str):
logger.info(f'removing certificate for {domain}')
self._run([self.acme_sh, '--remove', '-d', domain])
if os.path.exists(os.path.join(self.acme_dir, domain)):
shutil.rmtree(os.path.join(self.acme_dir, domain))
os.unlink(cert_path)
os.unlink(key_path)
def create_vhost(self, username: str, domain: str, ip_address: str):
cert_path, key_path = self._get_cert_and_key_path(domain)
if not (os.path.exists(cert_path) and os.path.exists(key_path)):
self._acquire_new_cert(domain, cert_path, key_path)
template = self.jinja_env.get_template('nginx_cloud_vhost_config.j2')
body = template.render(
username=username, domain=domain, ip_address=ip_address,
ssl_cert_path=cert_path, ssl_key_path=key_path)
filepath = self._vhost_filepath(username, domain)
logger.info(f'Writing a new vhost ({domain} -> {ip_address}) to {filepath}')
with open(filepath, 'w') as fo:
fo.write(body)
self._reload_web_server()
def delete_vhost(self, username: str, domain: str):
cert_path, key_path = self._get_cert_and_key_path(domain)
if os.path.exists(cert_path) and os.path.exists(key_path):
self._delete_cert(domain, cert_path, key_path)
filepath = self._vhost_filepath(username, domain)
logger.info(f'Deleting {filepath}')
os.unlink(filepath)
self._reload_web_server()
def get_num_vhosts(self, username: str) -> int:
return len(self._vhost_files(username))
def get_vhosts(self, username: str) -> List[Dict]:
vhosts = []
for filepath in self._vhost_files(username):
filename = os.path.basename(filepath)
match = VHOST_FILENAME_RE.match(filename)
assert match is not None, f"'{filename}' does not match expected pattern"
domain = match.group('domain')
ip_address = None
for line in open(filepath):
match = PROXY_PASS_IP_RE.match(line)
if match is None:
continue
ip_address = match.group('ip_address')
break
assert ip_address is not None, f"Could not find IP address in {filename}"
vhosts.append({'domain': domain, 'ip_address': ip_address})
return vhosts
def delete_all_vhosts_for_user(self, username: str):
filepaths = self._vhost_files(username)
if not filepaths:
return
for filepath in filepaths:
logger.info(f'Deleting {filepath}')
os.unlink(filepath)
self._reload_web_server()