You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
adc/ansible/roles/acmedns_base/templates/wraplego.py.j2

158 lines
5.5 KiB

#!/usr/bin/env python3
"""A wrapper for the lego commandline
Based on inflatable-wharf: https://github.com/mrled/inflatable-wharf
"""
import argparse
import datetime
import logging
import os
import subprocess
import sys
from cryptography import x509
from cryptography.hazmat.backends import default_backend
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s]\t%(levelname)s:\t%(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
LOGGER = logging.getLogger(__name__)
def abswalk(path):
"""Return a list of absolute paths to files and directories
"""
result = []
for root, dirs, files in os.walk(path):
for dirname in dirs:
result.append(f"{os.path.join(root, dirname)}{os.path.sep}")
for filename in files:
result.append(f"{os.path.join(root, filename)}")
result.sort()
return result
def subprocess_run_log(command, env=os.environ.copy()):
"""Run and log a command
command A list making up a command and its arguments
env An optional dictionary of environment variables; defaults to this process's env
Returns the CompletedProcess object if the process exits with a zer return code
Throws subprocess.CalledProcessError if the process exites with a nonzero return code
"""
proc = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
LOGGER.debug("{cmdname} exited with code {rc}\n{out}\n{err}".format(
cmdname=command[0],
rc=proc.returncode,
out=f"STDOUT:\n{proc.stdout}" if proc.stdout else "STDOUT: NONE",
err=f"STDERR:\n{proc.stderr}" if proc.stderr else "STDERR: NONE"))
if proc.returncode != 0:
raise subprocess.CalledProcessError(
proc.returncode, command, output=proc.stdout, stderr=proc.stderr)
return proc
def get_cert_expiration(certificate):
"""Get the certificate expiration date as a DateTime object
"""
with open(certificate, 'rb') as certfile:
cert_contents = certfile.read()
cert = x509.load_pem_x509_certificate(cert_contents, default_backend())
return cert.not_valid_after
def shouldrun(certificate_path, min_cert_validity=25):
"""Test whether certificates should be requested/renewed
certificate Path to a certificate file
min_cert_validity If cert exists but is invalid this many days into the future, renew it
"""
try:
expires = get_cert_expiration(certificate_path)
expiresdelta = expires - datetime.datetime.now()
if expiresdelta.days <= min_cert_validity:
LOGGER.info(
"Determined lego should be run because the cert expires in "
f"{expiresdelta.days} days")
return True
else:
LOGGER.info(
"Determined lego should NOT be run because the cert expires in "
f"{expiresdelta.days} days")
return False
except FileNotFoundError:
LOGGER.info("Determined lego should be run because the cert does not exist locally")
return True
def certpath(legodir, domain, filetype):
"""Get the path of a cert file based on its characteristics
"""
result = os.path.join(legodir, "certificates", f"{domain}.{filetype}")
LOGGER.debug(f"certpath({legodir}, {domain}, {filetype}) => {result}")
return result
def lego(lego_dir, email, domain, authenticator, key_type=None, production=True, whatif=False):
"""Run the lego command.
whatif Do not actually run lego, but show what would have run
"""
command = [
'/usr/local/bin/lego', '--accept-tos',
'--path', lego_dir,
'--email', email,
'--domains', domain,
'--dns', authenticator,
'--dns-timeout', '60', # helped w 'could not determine authoritative nameservers' err
]
if not production:
command += ['--server', 'https://acme-staging-v02.api.letsencrypt.org/directory']
if key_type:
command += ['--key-type', key_type]
if os.path.exists(certpath(lego_dir, domain, 'crt')):
command += ["renew"]
else:
command += ["run"]
LOGGER.info("Running lego in {mode} mode as [{cmd}]".format(
mode="WHATIF" if whatif else "OPERATIONAL",
cmd=' '.join(command)))
if not whatif:
subprocess_run_log(command)
acme_dir_contents = '\n'.join(abswalk(lego_dir))
LOGGER.info(f"Current contents of {lego_dir}:\n{acme_dir_contents}")
def main(*args, **kwargs):
parser = argparse.ArgumentParser()
parser.add_argument("--legodir")
parser.add_argument("--email")
parser.add_argument("--domain")
parser.add_argument("--authenticator", default="route53")
parser.add_argument("--whatif", "-z", action="store_true")
parser.add_argument("--staging", action="store_true")
parser.add_argument("--renewdays", type=int, default=25)
parser.add_argument("--debug", "-d", action="store_true")
parser.add_argument("--key-type")
parser.add_argument("--verbose", "-v", action="store_true")
parsed = parser.parse_args()
if parsed.debug or parsed.verbose:
LOGGER.setLevel(logging.DEBUG)
if shouldrun(certpath(parsed.legodir, parsed.domain, 'crt'), min_cert_validity=parsed.renewdays):
lego(
parsed.legodir,
parsed.email,
parsed.domain,
parsed.authenticator,
key_type=parsed.key_type,
production=not parsed.staging,
whatif=parsed.whatif)
if __name__ == '__main__':
sys.exit(main(*sys.argv))