You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
3.7 KiB

import requests
import arkevars
import json
import logging
from datetime import datetime
import os
import whois
import OpenSSL
import ssl
import time
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p',level=logging.INFO,filename='arke.log')
logger = logging.getLogger("arke")
def monitor_HttpTargets(monitoringtargets):
responseTable = {}
for target in monitoringtargets:
try:
statuscode = requests.get(target).status_code
logger.info(f"target: {target} statuscode: {statuscode}")
responseTable[target] = statuscode
except requests.ConnectionError:
logger.warn(f"target: {target} ERROR: Failure to connect.")
responseTable[target] = "Failed to connect."
return responseTable
def monitor_DomainExpiry(targets):
responseTable = {}
current_year = datetime.today().year
for domain in targets:
expire_year = whois.query(domain).expiration_date.year
try:
if expire_year - current_year <= 1:
responseTable[domain] = "Domain expiring in < 1 year, please rectify."
else:
responseTable[domain] = "Domain is healthy."
except:
responseTable[domain] = "Failed to query domain info"
return responseTable
def monitor_TlsExpiry(targets):
responseTable = {}
current_year = datetime.today().year
for site in targets:
cert = ssl.get_server_certificate((site, 443))
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
expiration = datetime.strptime(x509.get_notAfter().decode(), '%Y%m%d%H%M%SZ')
try:
if expiration - current_year <= 1:
responseTable[site] = "TLS expiring in < 30 days, please rectify."
else:
responseTable[site] = "cert is healthy."
except:
responseTable[site] = "Failed to query cert info"
return responseTable
is_on = True
while is_on:
today = datetime.today()
# make sure http targets are /up/
datastore = monitor_HttpTargets(arkevars.httpTargets)
json_string = json.dumps(datastore)
# get SSL certs on http targets
cert_info = monitor_TlsExpiry(arkevars.tlsTargets)
cert_json = json.dumps(cert_info)
# get whois info on domain targets
domain_info = monitor_DomainExpiry(arkevars.domains_to_check)
domain_json = json.dumps(domain_info)
# write new results to file
file = open("/shared/results.json", "a+")
file.write(json_string)
file.write(cert_json)
file.write(domain_json)
file.close()
# track state
file = open("/shared/results.json", "r")
stateFile = open("/shared/state.log", "r")
oldData = stateFile.read()
if oldData != file.read():
stateChanged = True
else:
stateChanged = False
# delete state.log so I can write to it cleanly
os.remove("/shared/state.log")
# queue up an alert if stateChanged = True
results = []
with open("/shared/results.json", "r") as json_File:
for line in json_File:
results.append(json.loads(line))
for item in results:
for key, value in item.items():
if stateChanged is True:
errorFile = open("/shared/alerts.log", "a+")
errorText = key + " returned with status " + str(value) + "\n"
errorFile.write(errorText)
# Copy current results to state.log file for next iteration
errorFile = open("/shared/state.log", "a+")
errorFile.write(json_string)
errorFile.write(cert_json)
errorFile.write(domain_json)
errorFile.close()
os.remove("/shared/results.json")
time.sleep(60)