You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
4.1 KiB

import requests
import arkevars
import json
import logging
from datetime import datetime
import os
import whois
import OpenSSL
import ssl
import time
import pdb
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p',level=logging.INFO,filename='arke.log')
logger = logging.getLogger("arke")
last_round_file_path = "/shared/state.json"
this_round_file_path = "/shared/results.json"
alert_file_path = "/shared/alerts.log"
def monitor_HttpTargets(monitoringtargets):
responseTable = {}
for target in monitoringtargets:
try:
statuscode = requests.get(target).status_code
logger.info(f"target: {target} statuscode: {statuscode}")
responseTable[target] = statuscode
except requests.ConnectionError:
logger.warn(f"target: {target} ERROR: Failure to connect.")
responseTable[target] = "Failed to connect."
return responseTable
def monitor_DomainExpiry(targets):
responseTable = {}
current_year = datetime.today().year
for domain in targets:
expire_year = whois.query(domain).expiration_date.year
try:
if expire_year - current_year <= 1:
responseTable[domain] = "Domain expiring in less than 1 year, please rectify."
else:
responseTable[domain] = "Domain is healthy."
except:
responseTable[domain] = "Failed to query domain info"
return responseTable
def monitor_TlsExpiry(targets):
responseTable = {}
current_year = datetime.today().year
for site in targets:
cert = ssl.get_server_certificate((site, 443))
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
expiration = datetime.strptime(x509.get_notAfter().decode(), '%Y%m%d%H%M%SZ')
try:
if expiration - current_year <= 1:
responseTable[site] = "TLS expiring in less than 30 days, please rectify."
else:
responseTable[site] = "cert is healthy."
except:
responseTable[site] = "Failed to query cert info"
return responseTable
is_on = True
while is_on:
today = datetime.today()
datastore = {}
# make sure http targets are /up/
datastore['http'] = monitor_HttpTargets(arkevars.httpTargets)
# get SSL certs on http targets
datastore['certs'] = monitor_TlsExpiry(arkevars.tlsTargets)
# get whois info on domain targets
datastore['whois'] = monitor_DomainExpiry(arkevars.domains_to_check)
# write new results to file
with open(this_round_file_path, "a+", encoding="utf-8") as outfile:
json.dump(datastore, outfile, ensure_ascii=False, sort_keys=True)
# track state
with open(this_round_file_path, "a+", encoding="utf-8") as new_file:
if os.path.exists(last_round_file_path):
with open(last_round_file_path, "r+", encoding="utf-8") as old_file:
oldData = old_file.read()
else:
with open(last_round_file_path, "a+", encoding="utf-8") as old_file:
oldData = old_file.read()
newData = open(this_round_file_path, "r").read()
if oldData != newData:
stateChanged = True
else:
stateChanged = False
# delete state.json so I can write to it cleanly
os.remove(last_round_file_path)
# queue up an alert if stateChanged = True
results = []
with open(this_round_file_path, "r+", encoding="utf-8") as json_File:
if stateChanged is True:
json_data = json.load(json_File)
for inner_key, inner_value in json_data.items():
for key, value in inner_value.items():
with open(alert_file_path, "a") as error_file:
error_text = str(key) + " " + str(value) + "\n"
error_file.write(error_text)
# Copy current results to state.log file for next iteration
with open(last_round_file_path, "a+") as json_File:
json_datastore = json.dumps(datastore, ensure_ascii=False, sort_keys=True)
json_File.write(json_datastore)
os.remove(this_round_file_path)
time.sleep(60)