josiah
7b7284c32f
all of this is required for the synology LE role to work. this is still a massive WIP commit. synology LE works, but synology webdav using that LE cert does not yet work. there appears to be some cipher mismatch issue by default.
158 lines
5.5 KiB
Django/Jinja
158 lines
5.5 KiB
Django/Jinja
#!/usr/bin/env python3
|
|
|
|
"""A wrapper for the lego commandline
|
|
Based on inflatable-wharf: https://github.com/mrled/inflatable-wharf
|
|
"""
|
|
|
|
import argparse
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='[%(asctime)s]\t%(levelname)s:\t%(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S')
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def abswalk(path):
|
|
"""Return a list of absolute paths to files and directories
|
|
"""
|
|
result = []
|
|
for root, dirs, files in os.walk(path):
|
|
for dirname in dirs:
|
|
result.append(f"{os.path.join(root, dirname)}{os.path.sep}")
|
|
for filename in files:
|
|
result.append(f"{os.path.join(root, filename)}")
|
|
result.sort()
|
|
return result
|
|
|
|
|
|
def subprocess_run_log(command, env=os.environ.copy()):
|
|
"""Run and log a command
|
|
command A list making up a command and its arguments
|
|
env An optional dictionary of environment variables; defaults to this process's env
|
|
Returns the CompletedProcess object if the process exits with a zer return code
|
|
Throws subprocess.CalledProcessError if the process exites with a nonzero return code
|
|
"""
|
|
proc = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
|
|
LOGGER.debug("{cmdname} exited with code {rc}\n{out}\n{err}".format(
|
|
cmdname=command[0],
|
|
rc=proc.returncode,
|
|
out=f"STDOUT:\n{proc.stdout}" if proc.stdout else "STDOUT: NONE",
|
|
err=f"STDERR:\n{proc.stderr}" if proc.stderr else "STDERR: NONE"))
|
|
if proc.returncode != 0:
|
|
raise subprocess.CalledProcessError(
|
|
proc.returncode, command, output=proc.stdout, stderr=proc.stderr)
|
|
return proc
|
|
|
|
|
|
def get_cert_expiration(certificate):
|
|
"""Get the certificate expiration date as a DateTime object
|
|
"""
|
|
with open(certificate, 'rb') as certfile:
|
|
cert_contents = certfile.read()
|
|
cert = x509.load_pem_x509_certificate(cert_contents, default_backend())
|
|
return cert.not_valid_after
|
|
|
|
|
|
def shouldrun(certificate_path, min_cert_validity=25):
|
|
"""Test whether certificates should be requested/renewed
|
|
certificate Path to a certificate file
|
|
min_cert_validity If cert exists but is invalid this many days into the future, renew it
|
|
"""
|
|
try:
|
|
expires = get_cert_expiration(certificate_path)
|
|
expiresdelta = expires - datetime.datetime.now()
|
|
if expiresdelta.days <= min_cert_validity:
|
|
LOGGER.info(
|
|
"Determined lego should be run because the cert expires in "
|
|
f"{expiresdelta.days} days")
|
|
return True
|
|
else:
|
|
LOGGER.info(
|
|
"Determined lego should NOT be run because the cert expires in "
|
|
f"{expiresdelta.days} days")
|
|
return False
|
|
except FileNotFoundError:
|
|
LOGGER.info("Determined lego should be run because the cert does not exist locally")
|
|
return True
|
|
|
|
|
|
def certpath(legodir, domain, filetype):
|
|
"""Get the path of a cert file based on its characteristics
|
|
"""
|
|
result = os.path.join(legodir, "certificates", f"{domain}.{filetype}")
|
|
LOGGER.debug(f"certpath({legodir}, {domain}, {filetype}) => {result}")
|
|
return result
|
|
|
|
|
|
def lego(lego_dir, email, domain, authenticator, key_type=None, production=True, whatif=False):
|
|
"""Run the lego command.
|
|
whatif Do not actually run lego, but show what would have run
|
|
"""
|
|
|
|
command = [
|
|
'/usr/local/bin/lego', '--accept-tos',
|
|
'--path', lego_dir,
|
|
'--email', email,
|
|
'--domains', domain,
|
|
'--dns', authenticator,
|
|
'--dns-timeout', '60', # helped w 'could not determine authoritative nameservers' err
|
|
]
|
|
if not production:
|
|
command += ['--server', 'https://acme-staging-v02.api.letsencrypt.org/directory']
|
|
if key_type:
|
|
command += ['--key-type', key_type]
|
|
if os.path.exists(certpath(lego_dir, domain, 'crt')):
|
|
command += ["renew"]
|
|
else:
|
|
command += ["run"]
|
|
|
|
LOGGER.info("Running lego in {mode} mode as [{cmd}]".format(
|
|
mode="WHATIF" if whatif else "OPERATIONAL",
|
|
cmd=' '.join(command)))
|
|
|
|
if not whatif:
|
|
subprocess_run_log(command)
|
|
|
|
acme_dir_contents = '\n'.join(abswalk(lego_dir))
|
|
LOGGER.info(f"Current contents of {lego_dir}:\n{acme_dir_contents}")
|
|
|
|
|
|
def main(*args, **kwargs):
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--legodir")
|
|
parser.add_argument("--email")
|
|
parser.add_argument("--domain")
|
|
parser.add_argument("--authenticator", default="route53")
|
|
parser.add_argument("--whatif", "-z", action="store_true")
|
|
parser.add_argument("--staging", action="store_true")
|
|
parser.add_argument("--renewdays", type=int, default=25)
|
|
parser.add_argument("--debug", "-d", action="store_true")
|
|
parser.add_argument("--key-type")
|
|
parser.add_argument("--verbose", "-v", action="store_true")
|
|
parsed = parser.parse_args()
|
|
if parsed.debug or parsed.verbose:
|
|
LOGGER.setLevel(logging.DEBUG)
|
|
if shouldrun(certpath(parsed.legodir, parsed.domain, 'crt'), min_cert_validity=parsed.renewdays):
|
|
lego(
|
|
parsed.legodir,
|
|
parsed.email,
|
|
parsed.domain,
|
|
parsed.authenticator,
|
|
key_type=parsed.key_type,
|
|
production=not parsed.staging,
|
|
whatif=parsed.whatif)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main(*sys.argv))
|