You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
3.5 KiB

import requests
import arkevars
import json
import logging
import datetime
import os
import whois
import OpenSSL
import ssl
import time
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p',level=logging.INFO,filename='arke.log')
logger = logging.getLogger("arke")
def monitor_HttpTargets(monitoringtargets):
responseTable = {}
for target in monitoringtargets:
try:
statuscode = requests.get(target).status_code
logger.info(f"target: {target} statuscode: {statuscode}")
responseTable[target] = statuscode
except requests.ConnectionError:
logger.warn(f"target: {target} ERROR: Failure to connect.")
responseTable[target] = "Failed to connect."
return responseTable
def monitor_DomainExpiry(targets):
responseTable = {}
current_year = datetime.today().year
for domain in targets:
expire_year = whois.query(domain).expiration_date.year
try:
if expire_year - current_year <= 1:
responseTable[domain] = "Domain expiring in < 1 year, please rectify."
except:
responseTable[domain] = "Failed to query domain info"
return responseTable
def monitor_TlsExpiry(targets):
responseTable = {}
current_year = datetime.today().year
for site in targets:
cert = ssl.get_server_certificate((site, 443))
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
expiration = datetime.strptime(x509.get_notAfter().decode(), '%Y%m%d%H%M%SZ')
try:
if expiration - current_year <= 1:
responseTable[site] = "TLS expiring in < 30 days, please rectify."
except:
responseTable[site] = "Failed to query cert info"
return responseTable
is_on = True
while is_on:
today = datetime.today()
# make sure http targets are /up/
datastore = monitor_HttpTargets(arkevars.httpTargets)
json_string = json.dumps(datastore)
# get SSL certs on http targets
cert_info = monitor_TlsExpiry(arkevars.httpTargets)
cert_json = json.dumps(cert_info)
# get whois info on domain targets
domain_info = monitor_DomainExpiry(arkevars.domains_to_check)
domain_json = json.dumps(domain_info)
# write new results to file
file = open("/shared/results.json", "a+")
file.write(json_string)
file.write("\n")
file.write(cert_json)
file.write("\n")
file.write(domain_json)
file.write("\n")
file.close()
# track state
file = open("/shared/results.json", "r")
stateFile = open("/shared/state.log", "r")
oldData = stateFile.read()
if oldData != json_string:
stateChanged = True
else:
stateChanged = False
# old file removal must happen after state tracking:
os.remove("/shared/state.log")
results = []
with open("/shared/results.json", "r") as json_File:
for line in json_File:
results.append(json.loads(line))
for item in results:
for key, value in item.items():
if stateChanged is True:
errorFile = open("/shared/alerts.log", "a+")
errorText = key + " returned with status " + str(value) + "\n"
errorFile.write(errorText)
# track state
errorFile = open("/shared/state.log", "a+")
errorFile.write(json_string)
errorFile.write(cert_json)
errorFile.write(domain_json)
errorFile.close()
os.remove("/shared/results.json")
time.sleep(60)