import requests import arkevars import json import logging from datetime import datetime import os import whois import OpenSSL import ssl import time import pdb logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p',level=logging.INFO,filename='arke.log') logger = logging.getLogger("arke") last_round_file_path = "/shared/state.json" this_round_file_path = "/shared/results.json" alert_file_path = "/shared/alerts.log" def monitor_HttpTargets(monitoringtargets): responseTable = {} for target in monitoringtargets: try: statuscode = requests.get(target).status_code logger.info(f"target: {target} statuscode: {statuscode}") responseTable[target] = statuscode except requests.ConnectionError: logger.warn(f"target: {target} ERROR: Failure to connect.") responseTable[target] = "Failed to connect." return responseTable def monitor_DomainExpiry(targets): responseTable = {} current_year = datetime.today().year for domain in targets: expire_year = whois.query(domain).expiration_date.year try: if expire_year - current_year <= 1: responseTable[domain] = "Domain expiring in less than 1 year, please rectify." else: responseTable[domain] = "Domain is healthy." except: responseTable[domain] = "Failed to query domain info" return responseTable def monitor_TlsExpiry(targets): responseTable = {} current_year = datetime.today().year for site in targets: cert = ssl.get_server_certificate((site, 443)) x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) expiration = datetime.strptime(x509.get_notAfter().decode(), '%Y%m%d%H%M%SZ') try: if expiration - current_year <= 1: responseTable[site] = "TLS expiring in less than 30 days, please rectify." else: responseTable[site] = "cert is healthy." except: responseTable[site] = "Failed to query cert info" return responseTable is_on = True while is_on: today = datetime.today() datastore = {} # make sure http targets are /up/ datastore['http'] = monitor_HttpTargets(arkevars.httpTargets) # get SSL certs on http targets datastore['certs'] = monitor_TlsExpiry(arkevars.tlsTargets) # get whois info on domain targets datastore['whois'] = monitor_DomainExpiry(arkevars.domains_to_check) # write new results to file with open(this_round_file_path, "a+", encoding="utf-8") as outfile: json.dump(datastore, outfile, ensure_ascii=False, sort_keys=True) # track state with open(this_round_file_path, "a+", encoding="utf-8") as new_file: if os.path.exists(last_round_file_path): with open(last_round_file_path, "r+", encoding="utf-8") as old_file: oldData = old_file.read() else: with open(last_round_file_path, "a+", encoding="utf-8") as old_file: oldData = old_file.read() newData = open(this_round_file_path, "r").read() if oldData != newData: stateChanged = True else: stateChanged = False # delete state.json so I can write to it cleanly os.remove(last_round_file_path) # queue up an alert if stateChanged = True results = [] with open(this_round_file_path, "r+", encoding="utf-8") as json_File: if stateChanged is True: json_data = json.load(json_File) for inner_key, inner_value in json_data.items(): for key, value in inner_value.items(): with open(alert_file_path, "a") as error_file: error_text = str(key) + " " + str(value) + "\n" error_file.write(error_text) # Copy current results to state.log file for next iteration with open(last_round_file_path, "a+") as json_File: json_datastore = json.dumps(datastore, ensure_ascii=False, sort_keys=True) json_File.write(json_datastore) os.remove(this_round_file_path) time.sleep(60)