import requests import arkevars import json import logging from datetime import datetime import os import whois import OpenSSL import ssl import time logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p',level=logging.INFO,filename='arke.log') logger = logging.getLogger("arke") last_round_file = "/shared/state.log" this_round_file = "/shared/results.json" def monitor_HttpTargets(monitoringtargets): responseTable = {} for target in monitoringtargets: try: statuscode = requests.get(target).status_code logger.info(f"target: {target} statuscode: {statuscode}") responseTable[target] = statuscode except requests.ConnectionError: logger.warn(f"target: {target} ERROR: Failure to connect.") responseTable[target] = "Failed to connect." return responseTable def monitor_DomainExpiry(targets): responseTable = {} current_year = datetime.today().year for domain in targets: expire_year = whois.query(domain).expiration_date.year try: if expire_year - current_year <= 1: responseTable[domain] = "Domain expiring in < 1 year, please rectify." else: responseTable[domain] = "Domain is healthy." except: responseTable[domain] = "Failed to query domain info" return responseTable def monitor_TlsExpiry(targets): responseTable = {} current_year = datetime.today().year for site in targets: cert = ssl.get_server_certificate((site, 443)) x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) expiration = datetime.strptime(x509.get_notAfter().decode(), '%Y%m%d%H%M%SZ') try: if expiration - current_year <= 1: responseTable[site] = "TLS expiring in < 30 days, please rectify." else: responseTable[site] = "cert is healthy." except: responseTable[site] = "Failed to query cert info" return responseTable is_on = True while is_on: today = datetime.today() # make sure http targets are /up/ datastore = monitor_HttpTargets(arkevars.httpTargets) json_string = json.dumps(datastore) # get SSL certs on http targets cert_info = monitor_TlsExpiry(arkevars.tlsTargets) cert_json = json.dumps(cert_info) # get whois info on domain targets domain_info = monitor_DomainExpiry(arkevars.domains_to_check) domain_json = json.dumps(domain_info) # write new results to file file = open(this_round_file, "a+") file.write(json_string) file.write(cert_json) file.write(domain_json) file.close() # track state file = open(this_round_file, "r") if os.path.exists(last_round_file): stateFile = open(last_round_file, "r") else: stateFile = open(last_round_file, "w+") oldData = stateFile.read() if oldData != file.read(): stateChanged = True else: stateChanged = False # delete state.log so I can write to it cleanly os.remove(last_round_file) # queue up an alert if stateChanged = True results = [] with open(this_round_file, "r") as json_File: json_data = json.load(json_File) for item in json_data: results.append(item) for item in results: for key, value in item.items(): if stateChanged is True: errorFile = open("/shared/alerts.log", "a+") errorText = key + " returned with status " + str(value) + "\n" errorFile.write(errorText) # Copy current results to state.log file for next iteration errorFile = open(last_round_file, "a+") errorFile.write(json_string) errorFile.write(cert_json) errorFile.write(domain_json) errorFile.close() os.remove(this_round_file) time.sleep(60)