158 lines
5.5 KiB
Plaintext
158 lines
5.5 KiB
Plaintext
|
#!/usr/bin/env python3
|
||
|
|
||
|
"""A wrapper for the lego commandline
|
||
|
Based on inflatable-wharf: https://github.com/mrled/inflatable-wharf
|
||
|
"""
|
||
|
|
||
|
import argparse
|
||
|
import datetime
|
||
|
import logging
|
||
|
import os
|
||
|
import subprocess
|
||
|
import sys
|
||
|
|
||
|
from cryptography import x509
|
||
|
from cryptography.hazmat.backends import default_backend
|
||
|
|
||
|
|
||
|
logging.basicConfig(
|
||
|
level=logging.INFO,
|
||
|
format='[%(asctime)s]\t%(levelname)s:\t%(message)s',
|
||
|
datefmt='%Y-%m-%d %H:%M:%S')
|
||
|
LOGGER = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
def abswalk(path):
|
||
|
"""Return a list of absolute paths to files and directories
|
||
|
"""
|
||
|
result = []
|
||
|
for root, dirs, files in os.walk(path):
|
||
|
for dirname in dirs:
|
||
|
result.append(f"{os.path.join(root, dirname)}{os.path.sep}")
|
||
|
for filename in files:
|
||
|
result.append(f"{os.path.join(root, filename)}")
|
||
|
result.sort()
|
||
|
return result
|
||
|
|
||
|
|
||
|
def subprocess_run_log(command, env=os.environ.copy()):
|
||
|
"""Run and log a command
|
||
|
command A list making up a command and its arguments
|
||
|
env An optional dictionary of environment variables; defaults to this process's env
|
||
|
Returns the CompletedProcess object if the process exits with a zer return code
|
||
|
Throws subprocess.CalledProcessError if the process exites with a nonzero return code
|
||
|
"""
|
||
|
proc = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
|
||
|
LOGGER.debug("{cmdname} exited with code {rc}\n{out}\n{err}".format(
|
||
|
cmdname=command[0],
|
||
|
rc=proc.returncode,
|
||
|
out=f"STDOUT:\n{proc.stdout}" if proc.stdout else "STDOUT: NONE",
|
||
|
err=f"STDERR:\n{proc.stderr}" if proc.stderr else "STDERR: NONE"))
|
||
|
if proc.returncode != 0:
|
||
|
raise subprocess.CalledProcessError(
|
||
|
proc.returncode, command, output=proc.stdout, stderr=proc.stderr)
|
||
|
return proc
|
||
|
|
||
|
|
||
|
def get_cert_expiration(certificate):
|
||
|
"""Get the certificate expiration date as a DateTime object
|
||
|
"""
|
||
|
with open(certificate, 'rb') as certfile:
|
||
|
cert_contents = certfile.read()
|
||
|
cert = x509.load_pem_x509_certificate(cert_contents, default_backend())
|
||
|
return cert.not_valid_after
|
||
|
|
||
|
|
||
|
def shouldrun(certificate_path, min_cert_validity=25):
|
||
|
"""Test whether certificates should be requested/renewed
|
||
|
certificate Path to a certificate file
|
||
|
min_cert_validity If cert exists but is invalid this many days into the future, renew it
|
||
|
"""
|
||
|
try:
|
||
|
expires = get_cert_expiration(certificate_path)
|
||
|
expiresdelta = expires - datetime.datetime.now()
|
||
|
if expiresdelta.days <= min_cert_validity:
|
||
|
LOGGER.info(
|
||
|
"Determined lego should be run because the cert expires in "
|
||
|
f"{expiresdelta.days} days")
|
||
|
return True
|
||
|
else:
|
||
|
LOGGER.info(
|
||
|
"Determined lego should NOT be run because the cert expires in "
|
||
|
f"{expiresdelta.days} days")
|
||
|
return False
|
||
|
except FileNotFoundError:
|
||
|
LOGGER.info("Determined lego should be run because the cert does not exist locally")
|
||
|
return True
|
||
|
|
||
|
|
||
|
def certpath(legodir, domain, filetype):
|
||
|
"""Get the path of a cert file based on its characteristics
|
||
|
"""
|
||
|
result = os.path.join(legodir, "certificates", f"{domain}.{filetype}")
|
||
|
LOGGER.debug(f"certpath({legodir}, {domain}, {filetype}) => {result}")
|
||
|
return result
|
||
|
|
||
|
|
||
|
def lego(lego_dir, email, domain, authenticator, key_type=None, production=True, whatif=False):
|
||
|
"""Run the lego command.
|
||
|
whatif Do not actually run lego, but show what would have run
|
||
|
"""
|
||
|
|
||
|
command = [
|
||
|
'/usr/local/bin/lego', '--accept-tos',
|
||
|
'--path', lego_dir,
|
||
|
'--email', email,
|
||
|
'--domains', domain,
|
||
|
'--dns', authenticator,
|
||
|
'--dns-timeout', '60', # helped w 'could not determine authoritative nameservers' err
|
||
|
]
|
||
|
if not production:
|
||
|
command += ['--server', 'https://acme-staging-v02.api.letsencrypt.org/directory']
|
||
|
if key_type:
|
||
|
command += ['--key-type', key_type]
|
||
|
if os.path.exists(certpath(lego_dir, domain, 'crt')):
|
||
|
command += ["renew"]
|
||
|
else:
|
||
|
command += ["run"]
|
||
|
|
||
|
LOGGER.info("Running lego in {mode} mode as [{cmd}]".format(
|
||
|
mode="WHATIF" if whatif else "OPERATIONAL",
|
||
|
cmd=' '.join(command)))
|
||
|
|
||
|
if not whatif:
|
||
|
subprocess_run_log(command)
|
||
|
|
||
|
acme_dir_contents = '\n'.join(abswalk(lego_dir))
|
||
|
LOGGER.info(f"Current contents of {lego_dir}:\n{acme_dir_contents}")
|
||
|
|
||
|
|
||
|
def main(*args, **kwargs):
|
||
|
parser = argparse.ArgumentParser()
|
||
|
parser.add_argument("--legodir")
|
||
|
parser.add_argument("--email")
|
||
|
parser.add_argument("--domain")
|
||
|
parser.add_argument("--authenticator", default="route53")
|
||
|
parser.add_argument("--whatif", "-z", action="store_true")
|
||
|
parser.add_argument("--staging", action="store_true")
|
||
|
parser.add_argument("--renewdays", type=int, default=25)
|
||
|
parser.add_argument("--debug", "-d", action="store_true")
|
||
|
parser.add_argument("--key-type")
|
||
|
parser.add_argument("--verbose", "-v", action="store_true")
|
||
|
parsed = parser.parse_args()
|
||
|
if parsed.debug or parsed.verbose:
|
||
|
LOGGER.setLevel(logging.DEBUG)
|
||
|
if shouldrun(certpath(parsed.legodir, parsed.domain, 'crt'), min_cert_validity=parsed.renewdays):
|
||
|
lego(
|
||
|
parsed.legodir,
|
||
|
parsed.email,
|
||
|
parsed.domain,
|
||
|
parsed.authenticator,
|
||
|
key_type=parsed.key_type,
|
||
|
production=not parsed.staging,
|
||
|
whatif=parsed.whatif)
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
sys.exit(main(*sys.argv))
|