#!/usr/bin/env python
#
#-------------------------------------------------------------------------------
# Pi.Alert
# Open Source Network Guard / WIFI & LAN intrusion detector and Web service monitor
#
# pialert.py - Back module. Network scanner, Web service monitor
#-------------------------------------------------------------------------------
# Puche 2021 GNU GPLv3
# leiweibau 2024 GNU GPLv3
# piapiacz, hspindel
#-------------------------------------------------------------------------------
#===============================================================================
# IMPORTS
#===============================================================================
from __future__ import print_function
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from mac_vendor_lookup import MacLookup
from time import sleep, time, strftime
from base64 import b64encode
from urllib.parse import urlparse
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from pathlib import Path
from datetime import datetime
import sys, subprocess, os, re, datetime, sqlite3, socket, io, smtplib, csv, requests, time, pwd, glob, ipaddress, ssl, json
#===============================================================================
# CONFIG CONSTANTS
#===============================================================================
PIALERT_BACK_PATH = os.path.dirname(os.path.abspath(__file__))
PIALERT_PATH = PIALERT_BACK_PATH + "/.."
PIALERT_WEBSERVICES_LOG = PIALERT_PATH + "/log/pialert.webservices.log"
STOPPIALERT = PIALERT_PATH + "/config/setting_stoppialert"
PIALERT_DB_FILE = PIALERT_PATH + "/db/pialert.db"
PIALERT_DB_PATH = PIALERT_PATH + "/db"
REPORTPATH_WEBGUI = PIALERT_PATH + "/front/reports/"
STATUS_FILE_SCAN = PIALERT_BACK_PATH + "/.scanning"
STATUS_FILE_BACKUP = PIALERT_BACK_PATH + "/.backup"
if (sys.version_info > (3,0)):
exec(open(PIALERT_PATH + "/config/version.conf").read())
exec(open(PIALERT_PATH + "/config/pialert.conf").read())
else:
execfile(PIALERT_PATH + "/config/version.conf")
execfile(PIALERT_PATH + "/config/pialert.conf")
#===============================================================================
# MAIN
#===============================================================================
def main():
global startTime
global cycle
global log_timestamp
global sql_connection
global sql
# Header
print('\nPi.Alert v'+ VERSION_DATE)
print('---------------------------------------------------------')
print(f"Executing user: {get_username()}\n")
# Initialize global variables
log_timestamp = datetime.datetime.now()
# DB
sql_connection = None
sql = None
# Timestamp
startTime = datetime.datetime.now()
startTime = startTime.replace (second=0, microsecond=0)
# Check parameters
if len(sys.argv) != 2 :
print('usage pialert [scan_cycle] | internet_IP | update_vendors | cleanup' )
return
cycle = str(sys.argv[1])
if os.path.exists(STOPPIALERT) == True :
res = check_pialert_countdown()
else :
if cycle == 'internet_IP':
res = check_internet_IP()
elif cycle == 'cleanup':
res = cleanup_database()
elif cycle == 'update_vendors':
res = update_devices_MAC_vendors()
elif cycle == 'update_vendors_silent':
res = update_devices_MAC_vendors('-s')
else:
res = scan_network()
# Check error
if res != 0 :
closeDB()
return res
# Reporting
if cycle not in ['internet_IP', 'cleanup']:
email_reporting()
# Close SQL
closeDB()
# Remove scan status file created in scan_network()
if cycle not in ['internet_IP', 'cleanup', 'update_vendors', 'update_vendors_silent'] and os.path.exists(STATUS_FILE_SCAN):
os.remove(STATUS_FILE_SCAN)
# Final menssage
print('\nDONE!!!\n\n')
return 0
#===============================================================================
# Set Env (Userpermissions DB-file)
#===============================================================================
def get_username():
return pwd.getpwuid(os.getuid())[0]
# ------------------------------------------------------------------------------
def set_db_file_permissions():
print(f"\nPrepare Scan...")
print(f" Force file permissions on Pi.Alert db...")
# Set permissions
os.system("sudo chown " + get_username() + ":www-data " + PIALERT_DB_FILE)
os.system("sudo chmod 775 " + PIALERT_DB_FILE)
# Get permissions
fileinfo = Path(PIALERT_DB_FILE)
file_stat = fileinfo.stat()
print(f" DB permission mask: {oct(file_stat.st_mode)[-3:]}")
print(f" DB Owner and Group: {fileinfo.owner()}:{fileinfo.group()}")
# ------------------------------------------------------------------------------
def set_reports_file_permissions():
os.system("sudo chown -R " + get_username() + ":www-data " + REPORTPATH_WEBGUI)
os.system("sudo chmod -R 775 " + REPORTPATH_WEBGUI)
#===============================================================================
# Countdown
#===============================================================================
def check_pialert_countdown():
openDB()
if os.path.exists(STOPPIALERT):
# get timer from file
with open(STOPPIALERT, 'r') as file:
data = int(file.read().rstrip())
# print("Timer in min: %s" % data)
FILETIME = int(os.path.getctime(STOPPIALERT))
ACTUALTIME = int(time.time())
STOPTIME = FILETIME+(data*60)-60
if ( ACTUALTIME > STOPTIME ):
print("The file \"setting_stoppialert\" will be deleted")
os.remove(STOPPIALERT)
os.system('/usr/bin/python3 ' + PIALERT_BACK_PATH + '/pialert_reporting_test.py reporting_stoptimer')
sql.execute ("""INSERT INTO pialert_journal (Journal_DateTime, LogClass, Trigger, LogString, Hash, Additional_Info)
VALUES (?, 'c_002', 'cronjob', 'LogStr_0513', '', '') """, (startTime,))
sql_connection.commit()
else:
print(f"Timer Start: {time.ctime(FILETIME)}")
# Check 1min before cowntdown ends
# Delete stop file 1 min before countdown ends
print(f"Timer Ende : {time.ctime(STOPTIME+60)}")
print("----------------------------------------")
print("Timer still running")
closeDB()
#===============================================================================
# INTERNET IP CHANGE
#===============================================================================
def check_internet_IP():
# Header
print('Check Internet IP')
print(' Timestamp:', startTime )
print('\nRetrieving Internet IP...')
internet_IP = get_internet_IP()
# Check result = IP
if internet_IP == "" :
print(' Error retrieving Internet IP')
print(' Exiting...\n')
return 1
print(' ', internet_IP)
# Get previous stored IP
print('\nRetrieving previous IP...')
openDB()
previous_IP = get_previous_internet_IP()
print(' ', previous_IP)
# Check IP Change
if internet_IP != previous_IP :
print(' Saving new IP')
save_new_internet_IP (internet_IP)
print(' IP updated')
else :
print(' No changes to perform')
closeDB()
# Get Dynamic DNS IP
if DDNS_ACTIVE :
print('\nRetrieving Dynamic DNS IP...')
dns_IP = get_dynamic_DNS_IP()
# Check Dynamic DNS IP
if dns_IP == "" :
print(' Error retrieving Dynamic DNS IP')
print(' Exiting...\n')
return 1
print(' ', dns_IP)
# Check DNS Change
if dns_IP != internet_IP :
print(' Updating Dynamic DNS IP...')
message = set_dynamic_DNS_IP()
print(' ', message)
else :
print(' No changes to perform')
else :
print('\nSkipping Dynamic DNS update...')
# Run automated Speedtest
print(f"\nAuto Speedtest...")
if SPEEDTEST_TASK_ACTIVE :
# Check if Speedtest is installed
speedtest_binary = PIALERT_BACK_PATH + '/speedtest/speedtest'
if os.path.exists(speedtest_binary):
print(f" Crontab: {SPEEDTEST_TASK_CRON}")
run_speedtest_task(startTime, SPEEDTEST_TASK_CRON)
else:
print(' Skipping Speedtest... Not installed!')
else :
print(' Skipping Speedtest... Not activated!')
# Run automated UpdateCheck
print(f"\nAuto Update-Check...")
if AUTO_UPDATE_CHECK :
print(f" Crontab: {AUTO_UPDATE_CHECK_CRON}")
checkNewVersion(startTime, AUTO_UPDATE_CHECK_CRON)
else:
NewVersion_FrontendNotification(False,"")
print(f" Skipping Auto Update-Check... Not activated!")
# Run automated Backup
print(f"\nAuto Backup...")
if AUTO_DB_BACKUP :
print(f" Crontab: {AUTO_DB_BACKUP_CRON}")
if not os.path.exists(STATUS_FILE_BACKUP):
create_autobackup(startTime, AUTO_DB_BACKUP_CRON)
else:
print(" Backup function pending.")
else:
print(f" Skipping Auto Backup... Not activated!")
return 0
# ------------------------------------------------------------------------------
def create_autobackup(start_time, crontab_string):
# create status file
with open(STATUS_FILE_BACKUP, "w") as f:
f.write("")
# convert cron string
crontab_parts = crontab_string.split()
minute = parse_cron_part(crontab_parts[0], start_time.minute, 0, 60) # last value is the exit value, meaning the 1. invalid value
hour = parse_cron_part(crontab_parts[1], start_time.hour, 0, 60)
day_of_month = parse_cron_part(crontab_parts[2], start_time.day, 1, 32)
month = parse_cron_part(crontab_parts[3], start_time.month, 1, 13)
day_of_week = parse_cron_part(crontab_parts[4], start_time.weekday(), 0, 7)
# Compare cron
if (start_time.minute in minute) and (start_time.hour in hour) and (start_time.day in day_of_month) and \
(start_time.month in month) and (start_time.weekday() in day_of_week):
while os.path.exists(STATUS_FILE_SCAN):
if time.time() - start_time.timestamp() >= 300: # Check whether 5 minutes have passed
#print("The status file has not been deleted after 5 minutes. The script is terminated.")
if os.path.exists(STATUS_FILE_BACKUP):
os.remove(STATUS_FILE_BACKUP)
return
time.sleep(1) # wait 1 second
else:
print(" Backup is started...")
BACKUP_FILE_DATE = str(start_time)
BACKUP_FILE = PIALERT_DB_PATH + "/pialertdb_" + BACKUP_FILE_DATE.replace("-", "").replace(" ", "_").replace(":", "") + ".zip"
time.sleep(20) # wait 20s to finish the reporting
# Backup DB (no further checks)
sqlite_command = ['sqlite3', PIALERT_DB_PATH + '/pialert.db', '.backup ' + PIALERT_DB_PATH + '/temp/pialert.db']
subprocess.check_output(sqlite_command, universal_newlines=True)
subprocess.check_output(['zip', '-j', '-qq', BACKUP_FILE, PIALERT_PATH + '/db/temp/pialert.db'], universal_newlines=True)
time.sleep(4)
os.remove(PIALERT_DB_PATH + '/temp/pialert.db')
# Set Permissions for www-data (testing)
os.system("sudo chown www-data:www-data " + BACKUP_FILE)
os.system("sudo chmod 644 " + BACKUP_FILE)
# Cleanup
bak_files = glob.glob(os.path.join(PIALERT_DB_PATH, "pialertdb_20*.zip"))
bak_files.sort(key=os.path.getmtime, reverse=True)
for file in bak_files[AUTO_DB_BACKUP_KEEP:]:
os.remove(file)
print(f" Cleanup DB Backups")
# Backup config file
BACKUP_CONF_FILE = PIALERT_PATH + "/config/pialert-" + BACKUP_FILE_DATE.replace("-", "").replace(" ", "_").replace(":", "") + ".bak"
subprocess.check_output('cp ' + PIALERT_PATH + '/config/pialert.conf ' + BACKUP_CONF_FILE, shell=True)
# Set Permissions for www-data (testing)
os.system("sudo chown www-data:www-data " + BACKUP_CONF_FILE)
os.system("sudo chmod 644 " + BACKUP_CONF_FILE)
# Cleanup
bak_files = glob.glob(os.path.join(PIALERT_PATH + "/config", "pialert-20*.bak"))
bak_files.sort(key=os.path.getmtime, reverse=True)
for file in bak_files[AUTO_DB_BACKUP_KEEP:]:
os.remove(file)
print(f" Cleanup Config Backups")
openDB()
sql.execute ("""INSERT INTO pialert_journal (Journal_DateTime, LogClass, Trigger, LogString, Hash, Additional_Info)
VALUES (?, 'c_010', 'cronjob', 'LogStr_0011', '', '') """, (startTime,))
sql_connection.commit()
sql.execute ("""INSERT INTO pialert_journal (Journal_DateTime, LogClass, Trigger, LogString, Hash, Additional_Info)
VALUES (?, 'c_000', 'cronjob', 'LogStr_0007', '', '') """, (startTime,))
sql_connection.commit()
closeDB()
else:
print(f" Backup function was NOT executed.")
# remove status file
if os.path.exists(STATUS_FILE_BACKUP):
os.remove(STATUS_FILE_BACKUP)
# ------------------------------------------------------------------------------
def parse_cron_part(cron_part, current_value, cron_min_value, cron_max_value):
if cron_part == '*':
return set(range(cron_min_value, cron_max_value))
elif '/' in cron_part:
step = int(cron_part.split('/')[1])
return set(range(cron_min_value, cron_max_value, step))
elif '-' in cron_part:
start, end = map(int, cron_part.split('-'))
return set(range(start, end + 1))
elif ',' in cron_part:
values = cron_part.split(',')
return set(int(value) for value in values)
else:
return {int(cron_part)}
# ------------------------------------------------------------------------------
def NewVersion_FrontendNotification(newVersion,update_notes):
file_path = PIALERT_PATH + "/front/auto_Update.info"
if newVersion == True:
if not os.path.exists(file_path):
print(" Create Frontend Notification.")
else:
print(" Update Frontend Notification.")
with open(file_path, 'w') as file:
file.write(update_notes)
else:
if os.path.exists(file_path):
os.remove(file_path)
print(" Remove Frontend Notification.")
# ------------------------------------------------------------------------------
def checkNewVersion(start_time, crontab_string):
# convert cron string
crontab_parts = crontab_string.split()
minute = parse_cron_part(crontab_parts[0], start_time.minute, 0, 60) # last value is the exit value, meaning the 1. invalid value
hour = parse_cron_part(crontab_parts[1], start_time.hour, 0, 60)
day_of_month = parse_cron_part(crontab_parts[2], start_time.day, 1, 32)
month = parse_cron_part(crontab_parts[3], start_time.month, 1, 13)
day_of_week = parse_cron_part(crontab_parts[4], start_time.weekday(), 0, 7)
# Compare cron
if (start_time.minute in minute) and (start_time.hour in hour) and (start_time.day in day_of_month) and \
(start_time.month in month) and (start_time.weekday() in day_of_week):
newVersion = False
currentversion = VERSION_DATE
print(f" Current Version: {currentversion}")
UPDATE_CHECK_URL = "https://api.github.com/repos/leiweibau/Pi.Alert/commits?path=tar%2Fpialert_latest.tar&page=1&per_page=1"
#UPDATE_CHECK_URL = "https://api.github.com/repos/leiweibau/Pi.Alert/commits?path=tar%2Fpialert_latest.tar&sha=next_update&page=1&per_page=1"
data = ""
update_notes = ""
try:
url = requests.get(UPDATE_CHECK_URL)
text = url.text
data = json.loads(text)
except (requests.exceptions.ConnectionError, json.decoder.JSONDecodeError) as e:
print(" ERROR: Couldn't check for new release.")
data = ""
openDB()
if data != "" and len(data) > 0 and isinstance(data, list) and "commit" in data[0]:
dateTimeStr = data[0]['commit']['author']['date']
update_notes = data[0]['commit']['message']
date_obj = datetime.datetime.strptime(dateTimeStr, '%Y-%m-%dT%H:%M:%SZ')
latestversion = date_obj.strftime('%Y-%m-%d')
if latestversion > currentversion:
print(f" New version {latestversion} is available!")
newVersion = True
NewVersion_FrontendNotification(newVersion,update_notes)
sql.execute ("""INSERT INTO pialert_journal (Journal_DateTime, LogClass, Trigger, LogString, Hash, Additional_Info)
VALUES (?, 'c_060', 'cronjob', 'LogStr_0061', '', '') """, (startTime,))
else:
print(" Running the latest version.")
# newVersion is still FALSE
NewVersion_FrontendNotification(newVersion,update_notes)
sql.execute ("""INSERT INTO pialert_journal (Journal_DateTime, LogClass, Trigger, LogString, Hash, Additional_Info)
VALUES (?, 'c_060', 'cronjob', 'LogStr_0067', '', '') """, (startTime,))
else:
# newVersion is still FALSE
NewVersion_FrontendNotification(newVersion,update_notes)
sql.execute ("""INSERT INTO pialert_journal (Journal_DateTime, LogClass, Trigger, LogString, Hash, Additional_Info)
VALUES (?, 'c_060', 'cronjob', 'LogStr_0066', '', '') """, (startTime,))
closeDB()
else:
print(f" Version Check function was NOT executed.")
#-------------------------------------------------------------------------------
def run_speedtest_task(start_time, crontab_string):
# convert cron string
crontab_parts = crontab_string.split()
minute = parse_cron_part(crontab_parts[0], start_time.minute, 0, 60) # last value is the exit value, meaning the 1. invalid value
hour = parse_cron_part(crontab_parts[1], start_time.hour, 0, 60)
day_of_month = parse_cron_part(crontab_parts[2], start_time.day, 1, 32)
month = parse_cron_part(crontab_parts[3], start_time.month, 1, 13)
day_of_week = parse_cron_part(crontab_parts[4], start_time.weekday(), 0, 7)
# Define the command and arguments
command = ["sudo", PIALERT_BACK_PATH + "/speedtest/speedtest", "--accept-license", "--accept-gdpr", "-p", "no", "-f", "json"]
# Compare cron
if (start_time.minute in minute) and (start_time.hour in hour) and (start_time.day in day_of_month) and \
(start_time.month in month) and (start_time.weekday() in day_of_week):
openDB()
try:
output = subprocess.check_output(command, text=True)
# Parse the JSON output
result = json.loads(output)
# Access the speed test results
speedtest_isp = result['isp']
speedtest_server = result['server']['name'] + ' (' + result['server']['location'] + ') (' + result['server']['host'] + ')'
speedtest_ping = result['ping']['latency']
speedtest_down = round(result['download']['bandwidth'] / 125000, 2)
speedtest_up = round(result['upload']['bandwidth'] / 125000, 2)
# Build output
speedtest_output = ""
speedtest_output += f" ISP: {speedtest_isp}\n"
speedtest_output += f" Server: {speedtest_server}\n\n"
speedtest_output += f" Ping: {speedtest_ping} ms\n"
speedtest_output += f" Download Speed: {speedtest_down} Mbps\n"
speedtest_output += f" Upload Speed: {speedtest_up} Mbps\n"
print(speedtest_output)
# Prepare db string
speedtest_db_output = speedtest_output.replace("\n", "
")
# Insert in db
sql.execute ("""INSERT INTO Tools_Speedtest_History (speed_date, speed_isp, speed_server, speed_ping, speed_down, speed_up)
VALUES (?, ?, ?, ?, ?, ?) """, (startTime, speedtest_isp, speedtest_server, speedtest_ping, speedtest_down, speedtest_up))
# Logging
sql.execute ("""INSERT INTO pialert_journal (Journal_DateTime, LogClass, Trigger, LogString, Hash, Additional_Info)
VALUES (?, 'c_002', 'cronjob', 'LogStr_0255', '', ?) """, (startTime, speedtest_db_output))
sql_connection.commit()
except subprocess.CalledProcessError as e:
print(f"Error running 'speedtest': {e}")
except json.JSONDecodeError as e:
print(f"Error parsing JSON output: {e}")
closeDB()
else:
print(" Speedtest function was NOT executed.")
return 0
#-------------------------------------------------------------------------------
def get_internet_IP():
# dig_args = ['dig', '+short', '-4', 'myip.opendns.com', '@resolver1.opendns.com']
# cmd_output = subprocess.check_output (dig_args, universal_newlines=True)
curl_args = ['curl', '-s', QUERY_MYIP_SERVER]
cmd_output = subprocess.check_output (curl_args, universal_newlines=True)
return check_IP_format (cmd_output)
#-------------------------------------------------------------------------------
def get_dynamic_DNS_IP():
# Using default or OpenDNS DNS server
dig_args = ['dig', '+short', DDNS_DOMAIN]
# dig_args = ['dig', '+short', DDNS_DOMAIN, '@resolver1.opendns.com']
dig_output = subprocess.check_output (dig_args, universal_newlines=True)
return check_IP_format (dig_output)
#-------------------------------------------------------------------------------
def set_dynamic_DNS_IP():
# Update Dynamic IP
curl_output = subprocess.check_output (['curl', '-s',
DDNS_UPDATE_URL +
'username=' + DDNS_USER +
'&password=' + DDNS_PASSWORD +
'&hostname=' + DDNS_DOMAIN],
universal_newlines=True)
return curl_output
#-------------------------------------------------------------------------------
def get_previous_internet_IP():
# get previos internet IP stored in DB
sql.execute ("SELECT dev_LastIP FROM Devices WHERE dev_MAC = 'Internet' ")
return sql.fetchone()[0]
#-------------------------------------------------------------------------------
def save_new_internet_IP(pNewIP):
# Log new IP into logfile
append_line_to_file (LOG_PATH + '/IP_changes.log',
str(startTime) +'\t'+ pNewIP +'\n')
# Save event
sql.execute ("""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail)
VALUES ('Internet', ?, ?, 'Internet IP Changed',
'Previous Internet IP: '|| ?, 1) """,
(pNewIP, startTime, get_previous_internet_IP() ) )
# Save new IP
sql.execute ("""UPDATE Devices SET dev_LastIP = ?
WHERE dev_MAC = 'Internet' """,
(pNewIP,) )
sql_connection.commit()
#-------------------------------------------------------------------------------
def check_IP_format(pIP):
# Check IP format
IPv4SEG = r'(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])'
IPv4ADDR = r'(?:(?:' + IPv4SEG + r'\.){3,3}' + IPv4SEG + r')'
IP = re.search(IPv4ADDR, pIP)
# Return error if not IP
if IP is None :
return ""
return IP.group(0)
#===============================================================================
# Cleanup Tasks
#===============================================================================
def cleanup_database():
print('Cleanup Database')
print(' Timestamp:', startTime )
openDB()
try:
strdaystokeepOH = str(DAYS_TO_KEEP_ONLINEHISTORY)
except NameError: # variable not defined, use a default
strdaystokeepOH = str(30) # 1 month
try:
strdaystokeepEV = str(DAYS_TO_KEEP_EVENTS)
except NameError: # variable not defined, use a default
strdaystokeepEV = str(90) # 90 days
print(' Online_History, up to the lastest '+strdaystokeepOH+' days...')
sql.execute ("DELETE FROM Online_History WHERE Scan_Date <= date('now', '-"+strdaystokeepOH+" day')")
print(' Events, up to the lastest '+strdaystokeepEV+' days...')
sql.execute ("DELETE FROM Events WHERE eve_DateTime <= date('now', '-"+strdaystokeepEV+" day')")
print(' Services_Events, up to the lastest '+strdaystokeepOH+' days...')
sql.execute ("DELETE FROM Services_Events WHERE moneve_DateTime <= date('now', '-"+strdaystokeepOH+" day')")
print(' ICMP_Mon_Events, up to the lastest '+strdaystokeepOH+' days...')
sql.execute ("DELETE FROM ICMP_Mon_Events WHERE icmpeve_DateTime <= date('now', '-"+strdaystokeepOH+" day')")
print(' Trim Journal to the lastest 1000 entries')
sql.execute ("DELETE FROM pialert_journal WHERE journal_id NOT IN (SELECT journal_id FROM pialert_journal ORDER BY journal_id DESC LIMIT 1000) AND (SELECT COUNT(*) FROM pialert_journal) > 1000")
print(' Speedtest_History, up to the lastest '+strdaystokeepOH+' days...')
sql.execute ("DELETE FROM Tools_Speedtest_History WHERE speed_date <= date('now', '-"+strdaystokeepOH+" day')")
print(' Nmap Scan Results, up to the lastest '+strdaystokeepOH+' days...')
sql.execute ("DELETE FROM Tools_Nmap_ManScan WHERE scan_date <= date('now', '-"+strdaystokeepOH+" day')")
print(' Shrink Database...')
sql.execute ("VACUUM;")
sql.execute ("""INSERT INTO pialert_journal (Journal_DateTime, LogClass, Trigger, LogString, Hash, Additional_Info)
VALUES (?, 'c_010', 'cronjob', 'LogStr_0101', '', 'Cleanup') """, (startTime,))
closeDB()
return 0
#===============================================================================
# UPDATE DEVICE MAC VENDORS
#===============================================================================
def update_devices_MAC_vendors (pArg = ''):
print('Update HW Vendors')
print(' Timestamp:', startTime )
# Update vendors DB (oui)
print('\nUpdating vendors DB...')
update_args = ['sh', PIALERT_BACK_PATH + '/update_vendors.sh', pArg]
update_output = subprocess.check_output (update_args)
# Initialize variables
recordsToUpdate = []
ignored = 0
notFound = 0
# All devices loop
print('\nSearching devices vendor', end='')
openDB()
# Only the devices for which no vendor has yet been entered are attempted to be updated.
for device in sql.execute ("SELECT * FROM Devices WHERE dev_Vendor = ''") :
# Search vendor in HW Vendors DB
vendor = query_MAC_vendor (device['dev_MAC'])
if vendor == -1 :
notFound += 1
elif vendor == -2 :
ignored += 1
else :
recordsToUpdate.append ([vendor, device['dev_MAC']])
# progress bar
print('.', end='')
sys.stdout.flush()
print('')
print(" Devices Ignored: ", ignored)
print(" Vendors Not Found:", notFound)
print(" Vendors updated: ", len(recordsToUpdate) )
# mac-vendor-lookup update
try:
print('\nTry build in mac-vendor-lookup update')
mac = MacLookup()
mac.update_vendors()
print(' Update successful')
except:
print('\nFallback')
print(' Backup old mac-vendors.txt for mac-vendor-lookup')
p = subprocess.call(["cp $HOME/.cache/mac-vendors.txt $HOME/.cache/mac-vendors.bak"], shell=True)
print(' Create mac-vendors.txt for mac-vendor-lookup')
p = subprocess.call(["/usr/bin/sed -e 's/\t/:/g' -e 's/ü/ü/g' -e 's/ö/ö/g' -e 's/ä/ä/g' -e 's/ó/ó/g' -e 's/é/é/g' -e 's/â/–/g' -e 's/Â//g' -e '/^#/d' /usr/share/arp-scan/ieee-oui.txt > $HOME/.cache/mac-vendors.txt"], shell=True)
# update devices
sql.executemany ("UPDATE Devices SET dev_Vendor = ? WHERE dev_MAC = ? ",
recordsToUpdate )
closeDB()
#-------------------------------------------------------------------------------
def query_MAC_vendor(pMAC):
try :
pMACstr = str(pMAC)
# Check MAC parameter
mac = pMACstr.replace (':','')
if len(pMACstr) != 17 or len(mac) != 12 :
return -2
# Search vendor in HW Vendors DB
mac = mac[0:6]
grep_args = ['grep', '-i', mac, VENDORS_DB]
grep_output = subprocess.check_output (grep_args)
# Return Vendor
vendor = grep_output[7:]
return vendor.rstrip()
# not Found
except subprocess.CalledProcessError :
return -1
#===============================================================================
# SCAN NETWORK
#===============================================================================
def scan_network():
# Create scan status file
with open(STATUS_FILE_SCAN, "w") as f:
f.write("")
# Header
print('Scan Devices')
print(' Timestamp:', startTime )
# correct db permission every scan (user must/should be sudoer)
set_db_file_permissions()
# Query ScanCycle properties
print_log ('Query ScanCycle confinguration...')
scanCycle_data = query_ScanCycle_Data (True)
if scanCycle_data is None:
print('\n*************** ERROR ***************')
print('ScanCycle %s not found' % cycle )
print(' Exiting...\n')
return 1
# ScanCycle data
cycle_interval = scanCycle_data['cic_EveryXmin']
#arpscan_retries = scanCycle_data['cic_arpscanCycles']
# arp-scan command
print('\nScanning...')
print(' arp-scan Method...')
print_log ('arp-scan starts...')
arpscan_devices = execute_arpscan()
print_log ('arp-scan ends')
# Pi-hole
print(' Pi-hole Method...')
openDB()
print_log ('Pi-hole copy starts...')
copy_pihole_network()
# DHCP Leases
print(' DHCP Leases Method...')
read_DHCP_leases()
# Fritzbox
print(' Fritzbox Method...')
openDB()
print_log ('Fritzbox copy starts...')
read_fritzbox_active_hosts()
# Mikrotik
print(' Mikrotik Method...')
openDB()
print_log ('Mikrotik copy starts...')
read_mikrotik_leases()
# UniFi
print(' UniFi Method...')
openDB()
print_log ('UniFi copy starts...')
read_unifi_clients()
# Load current scan data 1/2
print('\nProcessing scan results...')
# Load current scan data 2/2
print_log ('Save scanned devices')
save_scanned_devices (arpscan_devices, cycle_interval)
# Process Ignore list
print(' Processing ignore list...')
remove_entries_from_table()
# Print stats
print_log ('Print Stats')
print_scan_stats()
print_log ('Stats end')
# Create Events
print('\nUpdating DB Info...')
print(' Sessions Events (connect / discconnect) ...')
insert_events()
# Create New Devices
# after create events -> avoid 'connection' event
print(' Creating new devices...')
create_new_devices()
# Update devices info
print(' Updating Devices Info...')
update_devices_data_from_scan()
# Resolve devices names
print_log (' Resolve devices names...')
update_devices_names()
# Void false connection - disconnections
print(' Voiding false (ghost) disconnections...')
void_ghost_disconnections()
# Pair session events (Connection / Disconnection)
print(' Pairing session events (connection / disconnection) ...')
pair_sessions_events()
# Sessions snapshot
print(' Creating sessions snapshot...')
create_sessions_snapshot()
# Skip repeated notifications
print(' Skipping repeated notifications...')
skip_repeated_notifications()
# Calc Activity History
print(' Calculate Activity History...')
calc_activity_history_main_scan()
# Web Service Monitoring
try:
enable_services_monitoring = SCAN_WEBSERVICES
except NameError:
enable_services_monitoring = False
if enable_services_monitoring == True:
if str(startTime)[15] == "0":
service_monitoring()
# ICMP Monitoring
try:
enable_icmp_monitoring = ICMPSCAN_ACTIVE
except NameError:
enable_icmp_monitoring = False
if enable_icmp_monitoring == True:
icmp_monitoring()
# Check Rogue DHCP
try:
enable_rogue_dhcp_detection = SCAN_ROGUE_DHCP
except NameError:
enable_rogue_dhcp_detection = False
if enable_rogue_dhcp_detection == True:
print('\nLooking for Rogue DHCP Servers...')
rogue_dhcp_detection()
sql_connection.commit()
closeDB()
return 0
#-------------------------------------------------------------------------------
def query_ScanCycle_Data(pOpenCloseDB = False):
# Check if is necesary open DB
if pOpenCloseDB :
openDB()
# Query Data
sql.execute ("""SELECT cic_arpscanCycles, cic_EveryXmin
FROM ScanCycles
WHERE cic_ID = ? """, (cycle,))
sqlRow = sql.fetchone()
# Check if is necesary close DB
if pOpenCloseDB :
closeDB()
return sqlRow
#-------------------------------------------------------------------------------
def execute_arpscan():
# check if arp-scan is active
try:
module_arpscan_status = ARPSCAN_ACTIVE
except NameError:
module_arpscan_status = True
if not module_arpscan_status :
print(' ...Skipped')
unique_devices = []
return unique_devices
# output of possible multiple interfaces
arpscan_output = ""
# multiple interfaces
if type(SCAN_SUBNETS) is list:
print(" arp-scan: Multiple interfaces")
for interface in SCAN_SUBNETS :
arpscan_output += execute_arpscan_on_interface (interface)
# one interface only
else:
print(" arp-scan: One interface")
arpscan_output += execute_arpscan_on_interface (SCAN_SUBNETS)
# Search IP + MAC + Vendor as regular expresion
re_ip = r'(?P((2[0-5]|1[0-9]|[0-9])?[0-9]\.){3}((2[0-5]|1[0-9]|[0-9])?[0-9]))'
re_mac = r'(?P([0-9a-fA-F]{2}[:-]){5}([0-9a-fA-F]{2}))'
re_hw = r'(?P.*)'
re_pattern = re.compile (re_ip + '\s+' + re_mac + '\s' + re_hw)
# Create Userdict of devices
devices_list = [device.groupdict()
for device in re.finditer (re_pattern, arpscan_output)]
# Delete duplicate MAC
unique_mac = []
unique_devices = []
for device in devices_list :
if device['mac'] not in unique_mac:
unique_mac.append(device['mac'])
unique_devices.append(device)
return unique_devices
#-------------------------------------------------------------------------------
def execute_arpscan_on_interface(SCAN_SUBNETS):
# Prepare command arguments
subnets = SCAN_SUBNETS.strip().split()
# Retry is 3 to avoid false offline devices
arpscan_args = ['sudo', 'arp-scan', '--ignoredups', '--bandwidth=256k', '--retry=6'] + subnets
# Execute command
try:
# try runnning a subprocess
result = subprocess.check_output (arpscan_args, universal_newlines=True)
except subprocess.CalledProcessError as e:
# An error occured, handle it
print(e.output)
result = ""
return result
#-------------------------------------------------------------------------------
def copy_pihole_network():
# empty Fritzbox Network table
sql.execute ("DELETE FROM PiHole_Network")
# check if Pi-hole is active
if not PIHOLE_ACTIVE :
print(' ...Skipped')
return
# Open Pi-hole DB
sql.execute ("ATTACH DATABASE '"+ PIHOLE_DB +"' AS PH")
# Copy Pi-hole Network table
sql.execute ("""INSERT INTO PiHole_Network (PH_MAC, PH_Vendor, PH_LastQuery,
PH_Name, PH_IP)
SELECT hwaddr, macVendor, lastQuery,
(SELECT name FROM PH.network_addresses
WHERE network_id = id ORDER BY lastseen DESC, ip),
(SELECT ip FROM PH.network_addresses
WHERE network_id = id ORDER BY lastseen DESC, ip)
FROM PH.network
WHERE hwaddr NOT LIKE 'ip-%'
AND hwaddr <> '00:00:00:00:00:00' """)
sql.execute ("""UPDATE PiHole_Network SET PH_Name = '(unknown)'
WHERE PH_Name IS NULL OR PH_Name = '' """)
# Close Pi-hole DB
sql.execute ("DETACH PH")
#-------------------------------------------------------------------------------
def read_fritzbox_active_hosts():
# create table if not exists
sql_create_table = """ CREATE TABLE IF NOT EXISTS Fritzbox_Network(
"FB_MAC" STRING(50) NOT NULL COLLATE NOCASE,
"FB_IP" STRING(50) COLLATE NOCASE,
"FB_Name" STRING(50),
"FB_Vendor" STRING(250)
); """
sql.execute(sql_create_table)
sql_connection.commit()
# empty Fritzbox Network table
sql.execute ("DELETE FROM Fritzbox_Network")
# check if Pi-hole is active
if not FRITZBOX_ACTIVE :
print(' ...Skipped')
return
from fritzconnection.lib.fritzhosts import FritzHosts
# copy Fritzbox Network list
fh = FritzHosts(address=FRITZBOX_IP, user=FRITZBOX_USER, password=FRITZBOX_PASS)
hosts = fh.get_hosts_info()
for index, host in enumerate(hosts, start=1):
if host['status'] :
# status = 'active' if host['status'] else '-'
ip = host['ip'] if host['ip'] else 'no IP'
mac = host['mac'].lower() if host['mac'] else '-'
hostname = host['name']
try:
vendor = MacLookup().lookup(host['mac'])
except:
vendor = "Prefix is not registered"
sql.execute ("INSERT INTO Fritzbox_Network (FB_MAC, FB_IP, FB_Name, FB_Vendor) "+
"VALUES (?, ?, ?, ?) ", (mac, ip, hostname, vendor) )
#-------------------------------------------------------------------------------
def read_mikrotik_leases():
sql_create_table = """ CREATE TABLE IF NOT EXISTS Mikrotik_Network(
"MT_MAC" STRING(50) NOT NULL COLLATE NOCASE,
"MT_IP" STRING(50) COLLATE NOCASE,
"MT_Name" STRING(50),
"MT_Vendor" STRING(250)
); """
sql.execute(sql_create_table)
sql_connection.commit()
sql.execute ("DELETE FROM Mikrotik_Network")
if not MIKROTIK_ACTIVE:
print(' ...Skipped')
return
#installed using pip3 install routeros_api
import routeros_api
data = []
conn = routeros_api.RouterOsApiPool(MIKROTIK_IP, MIKROTIK_USER, MIKROTIK_PASS, plaintext_login=True)
api = conn.get_api()
ret = api.get_resource('/ip/dhcp-server/lease').get()
conn.disconnect()
for row in ret:
if 'active-mac-address' in row:
mac = row['active-mac-address'].lower()
ip = row['active-address']
hostname = row.get('host-name','')
try:
vendor = MacLookup().lookup(mac)
except:
vendor = "Prefix is not registered"
sql.execute ("INSERT INTO Mikrotik_Network (MT_MAC, MT_IP, MT_Name, MT_Vendor) "+
"VALUES (?, ?, ?, ?) ", (mac, ip, hostname, vendor) )
#-------------------------------------------------------------------------------
def read_unifi_clients():
sql_create_table = """ CREATE TABLE IF NOT EXISTS Unifi_Network(
"UF_MAC" STRING(50) NOT NULL COLLATE NOCASE,
"UF_IP" STRING(50) COLLATE NOCASE,
"UF_Name" STRING(50),
"UF_Vendor" STRING(250)
); """
sql.execute(sql_create_table)
sql_connection.commit()
sql.execute ("DELETE FROM Unifi_Network")
if not UNIFI_ACTIVE:
print(' ...Skipped')
return
from pyunifi.controller import Controller
# Enable self signed SSL / no warnings
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
try:
UNIFI_API_VERSION = UNIFI_API
except NameError: # variable not defined, use a default
UNIFI_API_VERSION = 'v5'
try:
data = []
c = Controller(UNIFI_IP,UNIFI_USER,UNIFI_PASS,8443,UNIFI_API_VERSION,'default',ssl_verify=False)
clients = c.get_clients()
for row in clients:
mac = row['mac'].lower()
ip = row.get('ip','no IP')
hostname = row.get('hostname',row.get('name',''))
vendor = row.get('oui',None)
if not vendor:
try:
vendor = MacLookup().lookup(mac)
except:
vendor = "Prefix is not registered"
sql.execute ("INSERT INTO Unifi_Network (UF_MAC, UF_IP, UF_Name, UF_Vendor) "+
"VALUES (?, ?, ?, ?) ", (mac, ip, hostname, vendor) )
except Exception as e:
print(' Could not connect to UniFi Controller')
#-------------------------------------------------------------------------------
def read_DHCP_leases():
# check DHCP Leases is active
if not DHCP_ACTIVE :
print(' ...Skipped')
return
# Read DHCP Leases
data = []
with open(DHCP_LEASES, 'r') as f:
for line in f:
row = line.rstrip().split()
if len(row) == 5 :
data.append (row)
# Insert into PiAlert table
sql.execute ("DELETE FROM DHCP_Leases")
sql.executemany ("""INSERT INTO DHCP_Leases (DHCP_DateTime, DHCP_MAC,
DHCP_IP, DHCP_Name, DHCP_MAC2)
VALUES (?, ?, ?, ?, ?)
""", data)
#-------------------------------------------------------------------------------
def save_scanned_devices(p_arpscan_devices, p_cycle_interval):
# Delete previous scan data
sql.execute ("DELETE FROM CurrentScan WHERE cur_ScanCycle = ?",
(cycle,))
# Insert new arp-scan devices
sql.executemany ("INSERT INTO CurrentScan (cur_ScanCycle, cur_MAC, "+
" cur_IP, cur_Vendor, cur_ScanMethod) "+
"VALUES ("+ cycle + ", :mac, :ip, :hw, 'arp-scan')",
p_arpscan_devices)
# Insert Pi-hole devices
sql.execute ("""INSERT INTO CurrentScan (cur_ScanCycle, cur_MAC,
cur_IP, cur_Vendor, cur_ScanMethod)
SELECT ?, PH_MAC, PH_IP, PH_Vendor, 'Pi-hole'
FROM PiHole_Network
WHERE PH_LastQuery >= ?
AND NOT EXISTS (SELECT 'X' FROM CurrentScan
WHERE cur_MAC = PH_MAC
AND cur_ScanCycle = ? )""",
(cycle,
(int(startTime.strftime('%s')) - 60 * p_cycle_interval),
cycle) )
# Insert Fritzbox devices
sql.execute ("""INSERT INTO CurrentScan (cur_ScanCycle, cur_MAC,
cur_IP, cur_Vendor, cur_ScanMethod)
SELECT ?, FB_MAC, FB_IP, FB_Vendor, 'Fritzbox'
FROM Fritzbox_Network
WHERE NOT EXISTS (SELECT 'X' FROM CurrentScan
WHERE cur_MAC = FB_MAC )""",
(cycle) )
# Insert Mikrotik devices
sql.execute ("""INSERT INTO CurrentScan (cur_ScanCycle, cur_MAC,
cur_IP, cur_Vendor, cur_ScanMethod)
SELECT ?, MT_MAC, MT_IP, MT_Vendor, 'Mikrotik'
FROM Mikrotik_Network
WHERE NOT EXISTS (SELECT 'X' FROM CurrentScan
WHERE cur_MAC = MT_MAC )""",
(cycle) )
# Insert UniFi devices
sql.execute ("""INSERT INTO CurrentScan (cur_ScanCycle, cur_MAC,
cur_IP, cur_Vendor, cur_ScanMethod)
SELECT ?, UF_MAC, UF_IP, UF_Vendor, 'UniFi'
FROM Unifi_Network
WHERE NOT EXISTS (SELECT 'X' FROM CurrentScan
WHERE cur_MAC = UF_MAC )""",
(cycle) )
# Check Internet connectivity
internet_IP = get_internet_IP()
# TESTING - Force IP
# internet_IP = ""
if internet_IP != "" :
sql.execute ("""INSERT INTO CurrentScan (cur_ScanCycle, cur_MAC, cur_IP, cur_Vendor, cur_ScanMethod)
VALUES (?, 'Internet', ?, Null, 'queryDNS') """, (cycle, internet_IP) )
local_mac_cmd = ["/sbin/ifconfig `ip -o route get 1 | sed 's/^.*dev \\([^ ]*\\).*$/\\1/;q'` | grep ether | awk '{print $2}'"]
local_mac = subprocess.Popen (local_mac_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0].decode().strip()
# local_ip_cmd = ["ip route list default | awk {'print $7'}"]
local_ip_cmd = ["ip -o route get 1 | sed 's/^.*src \\([^ ]*\\).*$/\\1/;q'"]
local_ip = subprocess.Popen (local_ip_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0].decode().strip()
# Check if local mac has been detected with other methods
sql.execute ("SELECT COUNT(*) FROM CurrentScan WHERE cur_ScanCycle = ? AND cur_MAC = ? ", (cycle, local_mac) )
if sql.fetchone()[0] == 0 :
sql.execute ("INSERT INTO CurrentScan (cur_ScanCycle, cur_MAC, cur_IP, cur_Vendor, cur_ScanMethod) "+
"VALUES ( ?, ?, ?, Null, 'local_MAC') ", (cycle, local_mac, local_ip) )
#-------------------------------------------------------------------------------
def remove_entries_from_table():
try:
MAC_IGNORE_LIST
if len(MAC_IGNORE_LIST) > 0:
print(f' Delete {len(MAC_IGNORE_LIST)} ignored devices/MAC ranges from scan on appearance')
# incomplete and complete MAC addresses
mac_addresses = ' OR '.join([f'cur_MAC LIKE "{mac}%"' for mac in MAC_IGNORE_LIST])
query = f'DELETE FROM CurrentScan WHERE {mac_addresses}'
sql.execute(query)
mac_addresses = ' OR '.join([f'PH_MAC LIKE "{mac}%"' for mac in MAC_IGNORE_LIST])
query = f'DELETE FROM PiHole_Network WHERE {mac_addresses}'
sql.execute(query)
mac_addresses = ' OR '.join([f'DHCP_MAC LIKE "{mac}%"' for mac in MAC_IGNORE_LIST])
query = f'DELETE FROM DHCP_Leases WHERE {mac_addresses}'
sql.execute(query)
mac_addresses = ' OR '.join([f'FB_MAC LIKE "{mac}%"' for mac in MAC_IGNORE_LIST])
query = f'DELETE FROM Fritzbox_Network WHERE {mac_addresses}'
sql.execute(query)
mac_addresses = ' OR '.join([f'MT_MAC LIKE "{mac}%"' for mac in MAC_IGNORE_LIST])
query = f'DELETE FROM Mikrotik_Network WHERE {mac_addresses}'
sql.execute(query)
mac_addresses = ' OR '.join([f'UF_MAC LIKE "{mac}%"' for mac in MAC_IGNORE_LIST])
query = f'DELETE FROM Unifi_Network WHERE {mac_addresses}'
sql.execute(query)
else:
print(f' Ignore list is empty')
except NameError:
print(" No ignore list defined")
#-------------------------------------------------------------------------------
def print_scan_stats():
# Devices Detected
sql.execute ("""SELECT COUNT(*) FROM CurrentScan
WHERE cur_ScanCycle = ? """,
(cycle,))
print(' Devices Detected.......:', str (sql.fetchone()[0]) )
# Devices arp-scan
sql.execute ("""SELECT COUNT(*) FROM CurrentScan
WHERE cur_ScanMethod='arp-scan' AND cur_ScanCycle = ? """,
(cycle,))
print(' arp-scan Method....:', str (sql.fetchone()[0]) )
# Devices Pi-hole
sql.execute ("""SELECT COUNT(*) FROM CurrentScan
WHERE cur_ScanMethod='Pi-hole' AND cur_ScanCycle = ? """,
(cycle,))
print(' Pi-hole Method.....: +' + str (sql.fetchone()[0]) )
# Devices Fritzbox
sql.execute ("""SELECT COUNT(*) FROM CurrentScan
WHERE cur_ScanMethod='Fritzbox' AND cur_ScanCycle = ? """,
(cycle,))
print(' Fritzbox Method....: +' + str (sql.fetchone()[0]) )
# Devices Mikrotik
sql.execute ("""SELECT COUNT(*) FROM CurrentScan
WHERE cur_ScanMethod='Mikrotik' AND cur_ScanCycle = ? """,
(cycle,))
print(' Mikrotik Method....: +' + str (sql.fetchone()[0]) )
# Devices UniFi
sql.execute ("""SELECT COUNT(*) FROM CurrentScan
WHERE cur_ScanMethod='UniFi' AND cur_ScanCycle = ? """,
(cycle,))
print(' UniFi Method.......: +' + str (sql.fetchone()[0]) )
# New Devices
sql.execute ("""SELECT COUNT(*) FROM CurrentScan
WHERE cur_ScanCycle = ?
AND NOT EXISTS (SELECT 1 FROM Devices
WHERE dev_MAC = cur_MAC) """,
(cycle,))
print(' New Devices........: ' + str (sql.fetchone()[0]) )
# Devices in this ScanCycle
sql.execute ("""SELECT COUNT(*) FROM Devices, CurrentScan
WHERE dev_MAC = cur_MAC AND dev_ScanCycle = cur_ScanCycle
AND dev_ScanCycle = ? """,
(cycle,))
print('')
print(' Devices in this scan...: ' + str (sql.fetchone()[0]) )
# Down Alerts
sql.execute ("""SELECT COUNT(*) FROM Devices
WHERE dev_AlertDeviceDown = 1
AND dev_ScanCycle = ?
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE dev_MAC = cur_MAC
AND dev_ScanCycle = cur_ScanCycle) """,
(cycle,))
print(' Down Alerts........: ' + str (sql.fetchone()[0]) )
# New Down Alerts
sql.execute ("""SELECT COUNT(*) FROM Devices
WHERE dev_AlertDeviceDown = 1
AND dev_PresentLastScan = 1
AND dev_ScanCycle = ?
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE dev_MAC = cur_MAC
AND dev_ScanCycle = cur_ScanCycle) """,
(cycle,))
print(' New Down Alerts....: ' + str (sql.fetchone()[0]) )
# New Connections
sql.execute ("""SELECT COUNT(*) FROM Devices, CurrentScan
WHERE dev_MAC = cur_MAC AND dev_ScanCycle = cur_ScanCycle
AND dev_PresentLastScan = 0
AND dev_ScanCycle = ? """,
(cycle,))
print(' New Connections....: ' + str ( sql.fetchone()[0]) )
# Disconnections
sql.execute ("""SELECT COUNT(*) FROM Devices
WHERE dev_PresentLastScan = 1
AND dev_ScanCycle = ?
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE dev_MAC = cur_MAC
AND dev_ScanCycle = cur_ScanCycle) """,
(cycle,))
print(' Disconnections.....: ' + str ( sql.fetchone()[0]) )
# IP Changes
sql.execute ("""SELECT COUNT(*) FROM Devices, CurrentScan
WHERE dev_MAC = cur_MAC AND dev_ScanCycle = cur_ScanCycle
AND dev_ScanCycle = ?
AND dev_LastIP <> cur_IP """,
(cycle,))
print(' IP Changes.........: ' + str ( sql.fetchone()[0]) )
#------------------------------------------------------------------------------
def calc_activity_history_main_scan():
# Add to History
sql.execute("SELECT * FROM Devices WHERE dev_Archived = 0 AND dev_PresentLastScan = 1")
Querry_Online_Devices = sql.fetchall()
History_Online_Devices = len(Querry_Online_Devices)
sql.execute("SELECT * FROM Devices WHERE dev_Archived = 0 AND dev_PresentLastScan = 0")
Querry_Offline_Devices = sql.fetchall()
History_Offline_Devices = len(Querry_Offline_Devices)
sql.execute("SELECT * FROM Devices WHERE dev_Archived = 1")
Querry_Archived_Devices = sql.fetchall()
History_Archived_Devices = len(Querry_Archived_Devices)
History_ALL_Devices = History_Online_Devices + History_Offline_Devices + History_Archived_Devices
sql.execute ("INSERT INTO Online_History (Scan_Date, Online_Devices, Down_Devices, All_Devices, Archived_Devices, Data_Source) "+
"VALUES ( ?, ?, ?, ?, ?, ?)", (startTime, History_Online_Devices, History_Offline_Devices, History_ALL_Devices, History_Archived_Devices, 'main_scan') )
#-------------------------------------------------------------------------------
def create_new_devices():
# arpscan - Insert events for new devices
print_log ('New devices - 1 Events')
sql.execute ("""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail)
SELECT cur_MAC, cur_IP, ?, 'New Device', cur_Vendor, 1
FROM CurrentScan
WHERE cur_ScanCycle = ?
AND NOT EXISTS (SELECT 1 FROM Devices
WHERE dev_MAC = cur_MAC) """,
(startTime, cycle) )
# arpscan - Create new devices
print_log ('New devices - 2 Create devices')
sql.execute ("""INSERT INTO Devices (dev_MAC, dev_name, dev_Vendor,
dev_LastIP, dev_FirstConnection, dev_LastConnection,
dev_ScanCycle, dev_AlertEvents, dev_AlertDeviceDown,
dev_PresentLastScan)
SELECT cur_MAC, '(unknown)', cur_Vendor, cur_IP, ?, ?,
1, 1, 0, 1
FROM CurrentScan
WHERE cur_ScanCycle = ?
AND NOT EXISTS (SELECT 1 FROM Devices
WHERE dev_MAC = cur_MAC) """,
(startTime, startTime, cycle) )
# Pi-hole - Insert events for new devices
print_log ('New devices - 3 Pi-hole Events')
sql.execute ("""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail)
SELECT PH_MAC, IFNULL (PH_IP,'-'), ?, 'New Device',
'(Pi-Hole) ' || PH_Vendor, 1
FROM PiHole_Network
WHERE NOT EXISTS (SELECT 1 FROM Devices
WHERE dev_MAC = PH_MAC) """,
(startTime, ) )
# Pi-hole - Create New Devices
print_log ('New devices - 4 Pi-hole Create devices')
sql.execute ("""INSERT INTO Devices (dev_MAC, dev_name, dev_Vendor,
dev_LastIP, dev_FirstConnection, dev_LastConnection,
dev_ScanCycle, dev_AlertEvents, dev_AlertDeviceDown,
dev_PresentLastScan)
SELECT PH_MAC, PH_Name, PH_Vendor, IFNULL (PH_IP,'-'),
?, ?, 1, 1, 0, 1
FROM PiHole_Network
WHERE NOT EXISTS (SELECT 1 FROM Devices
WHERE dev_MAC = PH_MAC) """,
(startTime, startTime) )
# DHCP Leases - Insert events for new devices
print_log ('New devices - 5 DHCP Leases Events')
sql.execute ("""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail)
SELECT DHCP_MAC, DHCP_IP, ?, 'New Device', '(DHCP lease)',1
FROM DHCP_Leases
WHERE NOT EXISTS (SELECT 1 FROM Devices
WHERE dev_MAC = DHCP_MAC) """,
(startTime, ) )
# DHCP Leases - Create New Devices
print_log ('New devices - 6 DHCP Leases Create devices')
sql.execute ("""INSERT INTO Devices (dev_MAC, dev_name, dev_LastIP,
dev_Vendor, dev_FirstConnection, dev_LastConnection,
dev_ScanCycle, dev_AlertEvents, dev_AlertDeviceDown,
dev_PresentLastScan)
SELECT DISTINCT DHCP_MAC,
(SELECT DHCP_Name FROM DHCP_Leases AS D2
WHERE D2.DHCP_MAC = D1.DHCP_MAC
ORDER BY DHCP_DateTime DESC LIMIT 1),
(SELECT DHCP_IP FROM DHCP_Leases AS D2
WHERE D2.DHCP_MAC = D1.DHCP_MAC
ORDER BY DHCP_DateTime DESC LIMIT 1),
'(unknown)', ?, ?, 1, 1, 0, 1
FROM DHCP_Leases AS D1
WHERE NOT EXISTS (SELECT 1 FROM Devices
WHERE dev_MAC = DHCP_MAC) """,
(startTime, startTime) )
print_log ('New Devices end')
#-------------------------------------------------------------------------------
def insert_events():
# Check device down
print_log ('Events 1 - Devices down')
sql.execute ("""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail)
SELECT dev_MAC, dev_LastIP, ?, 'Device Down', '', 1
FROM Devices
WHERE dev_AlertDeviceDown = 1
AND dev_PresentLastScan = 1
AND dev_ScanCycle = ?
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE dev_MAC = cur_MAC
AND dev_ScanCycle = cur_ScanCycle) """,
(startTime, cycle) )
# Check new connections
print_log ('Events 2 - New Connections')
sql.execute ("""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail)
SELECT cur_MAC, cur_IP, ?, 'Connected', '', dev_AlertEvents
FROM Devices, CurrentScan
WHERE dev_MAC = cur_MAC AND dev_ScanCycle = cur_ScanCycle
AND dev_PresentLastScan = 0
AND dev_ScanCycle = ? """,
(startTime, cycle) )
# Check disconnections
print_log ('Events 3 - Disconnections')
sql.execute ("""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail)
SELECT dev_MAC, dev_LastIP, ?, 'Disconnected', '',
dev_AlertEvents
FROM Devices
WHERE dev_AlertDeviceDown = 0
AND dev_PresentLastScan = 1
AND dev_ScanCycle = ?
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE dev_MAC = cur_MAC
AND dev_ScanCycle = cur_ScanCycle) """,
(startTime, cycle) )
# Check IP Changed
print_log ('Events 4 - IP Changes')
sql.execute ("""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime,
eve_EventType, eve_AdditionalInfo,
eve_PendingAlertEmail)
SELECT cur_MAC, cur_IP, ?, 'IP Changed',
'Previous IP: '|| dev_LastIP, dev_AlertEvents
FROM Devices, CurrentScan
WHERE dev_MAC = cur_MAC AND dev_ScanCycle = cur_ScanCycle
AND dev_ScanCycle = ?
AND dev_LastIP <> cur_IP """,
(startTime, cycle) )
print_log ('Events end')
#-------------------------------------------------------------------------------
def update_devices_data_from_scan():
# Update Last Connection
print_log ('Update devices - 1 Last Connection')
sql.execute ("""UPDATE Devices SET dev_LastConnection = ?,
dev_PresentLastScan = 1
WHERE dev_ScanCycle = ?
AND dev_PresentLastScan = 0
AND EXISTS (SELECT 1 FROM CurrentScan
WHERE dev_MAC = cur_MAC
AND dev_ScanCycle = cur_ScanCycle) """,
(startTime, cycle))
# Clean no active devices
print_log ('Update devices - 2 Clean no active devices')
sql.execute ("""UPDATE Devices SET dev_PresentLastScan = 0
WHERE dev_ScanCycle = ?
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE dev_MAC = cur_MAC
AND dev_ScanCycle = cur_ScanCycle) """,
(cycle,))
# Update IP & Vendor
print_log ('Update devices - 3 LastIP & Vendor')
sql.execute ("""UPDATE Devices
SET dev_LastIP = (SELECT cur_IP FROM CurrentScan
WHERE dev_MAC = cur_MAC
AND dev_ScanCycle = cur_ScanCycle),
dev_Vendor = (SELECT cur_Vendor FROM CurrentScan
WHERE dev_MAC = cur_MAC
AND dev_ScanCycle = cur_ScanCycle)
WHERE dev_ScanCycle = ?
AND EXISTS (SELECT 1 FROM CurrentScan
WHERE dev_MAC = cur_MAC
AND dev_ScanCycle = cur_ScanCycle) """,
(cycle,))
# Pi-hole Network - Update (unknown) Name
print_log ('Update devices - 4 Unknown Name')
sql.execute ("""UPDATE Devices
SET dev_NAME = (SELECT PH_Name FROM PiHole_Network
WHERE PH_MAC = dev_MAC)
WHERE (dev_Name = "(unknown)"
OR dev_Name = ""
OR dev_Name IS NULL)
AND EXISTS (SELECT 1 FROM PiHole_Network
WHERE PH_MAC = dev_MAC
AND PH_NAME IS NOT NULL
AND PH_NAME <> '') """)
# DHCP Leases - Update (unknown) Name
sql.execute ("""UPDATE Devices
SET dev_NAME = (SELECT DHCP_Name FROM DHCP_Leases
WHERE DHCP_MAC = dev_MAC)
WHERE (dev_Name = "(unknown)"
OR dev_Name = ""
OR dev_Name IS NULL)
AND EXISTS (SELECT 1 FROM DHCP_Leases
WHERE DHCP_MAC = dev_MAC)""")
# Fritzbox Leases - Update (unknown) Name
sql.execute ("""UPDATE Devices
SET dev_Name = (SELECT FB_Name FROM Fritzbox_Network
WHERE FB_MAC = dev_MAC)
WHERE (dev_Name = "(unknown)"
OR dev_Name = ""
OR dev_Name IS NULL)
AND EXISTS (SELECT 1 FROM Fritzbox_Network
WHERE FB_MAC = dev_MAC
AND FB_NAME IS NOT NULL
AND FB_NAME <> '') """)
# Mikrotik Leases - Update (unknown) Name
sql.execute ("""UPDATE Devices
SET dev_Name = (SELECT MT_Name FROM Mikrotik_Network
WHERE MT_MAC = dev_MAC)
WHERE (dev_Name = "(unknown)"
OR dev_Name = ""
OR dev_Name IS NULL)
AND EXISTS (SELECT 1 FROM Mikrotik_Network
WHERE MT_MAC = dev_MAC
AND MT_NAME IS NOT NULL
AND MT_NAME <> '') """)
# Unifi Leases - Update (unknown) Name
sql.execute ("""UPDATE Devices
SET dev_Name = (SELECT UF_Name FROM Unifi_Network
WHERE UF_MAC = dev_MAC)
WHERE (dev_Name = "(unknown)"
OR dev_Name = ""
OR dev_Name IS NULL)
AND EXISTS (SELECT 1 FROM Unifi_Network
WHERE UF_MAC = dev_MAC
AND UF_Name IS NOT NULL
AND UF_Name <> '') """)
# DHCP Leases - Vendor
print_log ('Update devices - 5 Vendor')
recordsToUpdate = []
query = """SELECT * FROM Devices
WHERE dev_Vendor = '(unknown)' OR dev_Vendor =''
OR dev_Vendor IS NULL"""
for device in sql.execute (query) :
vendor = query_MAC_vendor (device['dev_MAC'])
if vendor != -1 and vendor != -2 :
recordsToUpdate.append ([vendor, device['dev_MAC']])
sql.executemany ("UPDATE Devices SET dev_Vendor = ? WHERE dev_MAC = ? ",
recordsToUpdate )
# New Apple devices -> Cycle 15
print_log ('Update devices - 6 Cycle for Apple devices')
sql.execute ("""UPDATE Devices SET dev_ScanCycle = 1
WHERE dev_FirstConnection = ?
AND UPPER(dev_Vendor) LIKE '%APPLE%' """,
(startTime,) )
print_log ('Update devices end')
#-------------------------------------------------------------------------------
def update_devices_names():
# Initialize variables
recordsToUpdate = []
ignored = 0
notFound = 0
# Devices without name
print(' Trying to resolve devices without name...', end='')
for device in sql.execute ("SELECT * FROM Devices WHERE dev_Name IN ('(unknown)','') AND dev_LastIP <> '-'") :
# Resolve device name
newName = resolve_device_name (device['dev_MAC'], device['dev_LastIP'])
if newName == -1 :
notFound += 1
elif newName == -2 :
ignored += 1
else :
recordsToUpdate.append ([newName, device['dev_MAC']])
# progress bar
print('.', end='')
sys.stdout.flush()
# Print log
print('')
print(" Names updated: ", len(recordsToUpdate) )
# update devices
sql.executemany ("UPDATE Devices SET dev_Name = ? WHERE dev_MAC = ? ", recordsToUpdate )
#-------------------------------------------------------------------------------
def resolve_device_name_netbios(pIP):
try:
nbtscan_args =['nbtscan', '-v', '-s', ':', pIP+'/32']
newName = subprocess.run(nbtscan_args, capture_output=True, text=True, timeout=5)
if newName.returncode == 0 and newName.stdout:
lines = newName.stdout.strip().split('\n')
for line in lines:
if "00U" in line:
segments = line.split(':')
newName = segments[1].strip()
else:
newName = ""
return newName
# Error handling
except subprocess.TimeoutExpired:
newName = ""
return newName
#-------------------------------------------------------------------------------
def resolve_device_name_avahi(pIP):
try:
avahi_args = ['avahi-resolve', '-a', pIP]
newName = subprocess.run(avahi_args, capture_output=True, text=True, timeout=5)
if newName.returncode == 0 and newName.stdout:
ip_regex = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
newName = re.sub(ip_regex, '', newName.stdout)
else:
newName = ""
return newName.strip()
# Error handling
except subprocess.TimeoutExpired:
newName = ""
return newName
except subprocess.CalledProcessError:
newName = ""
return newName
#-------------------------------------------------------------------------------
def resolve_device_name_dig(pIP):
# DNS Server Fallback
try:
temp = NETWORK_DNS_SERVER
except NameError:
NETWORK_DNS_SERVER = "localhost"
try:
dig_args = ['dig', '+short', '-x', pIP, '@'+NETWORK_DNS_SERVER]
newName = subprocess.check_output (dig_args, universal_newlines=True, timeout=5)
if ";; communications error to" in newName:
newName = ""
return newName.strip()
# Error handling
except subprocess.TimeoutExpired:
newName = ""
return newName
except subprocess.CalledProcessError:
newName = ""
return newName
#-------------------------------------------------------------------------------
def resolve_device_name(pMAC, pIP):
pMACstr = str(pMAC)
# Check MAC parameter
mac = pMACstr.replace (':','')
if len(pMACstr) != 17 or len(mac) != 12 :
return -2
newName = resolve_device_name_avahi(pIP)
if newName == "":
newName = resolve_device_name_dig(pIP)
if newName == "":
newName = resolve_device_name_netbios(pIP)
# Check returns
newName = newName.strip()
if len(newName) == 0 :
return -2
# Eliminate local domain
if newName.endswith('.') :
newName = newName[:-1]
if newName.endswith('.lan') :
newName = newName[:-4]
if newName.endswith('.local') :
newName = newName[:-6]
if newName.endswith('.home') :
newName = newName[:-5]
return newName
#-------------------------------------------------------------------------------
def void_ghost_disconnections():
# Void connect ghost events (disconnect event exists in last X min.)
print_log ('Void - 1 Connect ghost events')
sql.execute ("""UPDATE Events SET eve_PairEventRowid = Null,
eve_EventType ='VOIDED - ' || eve_EventType
WHERE eve_MAC != 'Internet'
AND eve_EventType = 'Connected'
AND eve_DateTime = ?
AND eve_MAC IN (
SELECT Events.eve_MAC
FROM CurrentScan, Devices, ScanCycles, Events
WHERE cur_ScanCycle = ?
AND dev_MAC = cur_MAC
AND dev_ScanCycle = cic_ID
AND cic_ID = cur_ScanCycle
AND eve_MAC = cur_MAC
AND eve_EventType = 'Disconnected'
AND eve_DateTime >=
DATETIME (?, '-' || cic_EveryXmin ||' minutes')
) """,
(startTime, cycle, startTime) )
# Void connect paired events
print_log ('Void - 2 Paired events')
sql.execute ("""UPDATE Events SET eve_PairEventRowid = Null
WHERE eve_MAC != 'Internet'
AND eve_PairEventRowid IN (
SELECT Events.RowID
FROM CurrentScan, Devices, ScanCycles, Events
WHERE cur_ScanCycle = ?
AND dev_MAC = cur_MAC
AND dev_ScanCycle = cic_ID
AND cic_ID = cur_ScanCycle
AND eve_MAC = cur_MAC
AND eve_EventType = 'Disconnected'
AND eve_DateTime >=
DATETIME (?, '-' || cic_EveryXmin ||' minutes')
) """,
(cycle, startTime) )
# Void disconnect ghost events
print_log ('Void - 3 Disconnect ghost events')
sql.execute ("""UPDATE Events SET eve_PairEventRowid = Null,
eve_EventType = 'VOIDED - '|| eve_EventType
WHERE eve_MAC != 'Internet'
AND ROWID IN (
SELECT Events.RowID
FROM CurrentScan, Devices, ScanCycles, Events
WHERE cur_ScanCycle = ?
AND dev_MAC = cur_MAC
AND dev_ScanCycle = cic_ID
AND cic_ID = cur_ScanCycle
AND eve_MAC = cur_MAC
AND eve_EventType = 'Disconnected'
AND eve_DateTime >=
DATETIME (?, '-' || cic_EveryXmin ||' minutes')
) """,
(cycle, startTime) )
print_log ('Void end')
#-------------------------------------------------------------------------------
def pair_sessions_events():
# Pair Connection / New Device events
print_log ('Pair session - 1 Connections / New Devices')
sql.execute ("""UPDATE Events
SET eve_PairEventRowid =
(SELECT ROWID
FROM Events AS EVE2
WHERE EVE2.eve_EventType IN ('New Device', 'Connected',
'Device Down', 'Disconnected')
AND EVE2.eve_MAC = Events.eve_MAC
AND EVE2.eve_Datetime > Events.eve_DateTime
ORDER BY EVE2.eve_DateTime ASC LIMIT 1)
WHERE eve_EventType IN ('New Device', 'Connected')
AND eve_PairEventRowid IS NULL
""" )
# Pair Disconnection / Device Down
print_log ('Pair session - 2 Disconnections')
sql.execute ("""UPDATE Events
SET eve_PairEventRowid =
(SELECT ROWID
FROM Events AS EVE2
WHERE EVE2.eve_PairEventRowid = Events.ROWID)
WHERE eve_EventType IN ('Device Down', 'Disconnected')
AND eve_PairEventRowid IS NULL
""" )
print_log ('Pair session end')
#-------------------------------------------------------------------------------
def create_sessions_snapshot():
# Clean sessions snapshot
print_log ('Sessions Snapshot - 1 Clean')
sql.execute ("DELETE FROM SESSIONS" )
# Insert sessions
print_log ('Sessions Snapshot - 2 Insert')
sql.execute ("""INSERT INTO Sessions
SELECT * FROM Convert_Events_to_Sessions""" )
print_log ('Sessions end')
#-------------------------------------------------------------------------------
def skip_repeated_notifications():
# Skip repeated notifications
# due strfime : Overflow --> use "strftime / 60"
print_log ('Skip Repeated')
sql.execute ("""UPDATE Events SET eve_PendingAlertEmail = 0
WHERE eve_PendingAlertEmail = 1 AND eve_MAC IN
(
SELECT dev_MAC FROM Devices
WHERE dev_LastNotification IS NOT NULL
AND dev_LastNotification <>""
AND (strftime("%s", dev_LastNotification)/60 +
dev_SkipRepeated * 60) >
(strftime('%s','now','localtime')/60 )
)
""" )
print_log ('Skip Repeated end')
#===============================================================================
# nmap Scan - DHCP Detection
#===============================================================================
def validate_dhcp_address(ip_string):
try:
ip_object = ipaddress.ip_address(ip_string)
return True
except ValueError:
return False
# -----------------------------------------------------------------------------------
def rogue_dhcp_detection():
# Create Table is not exist
sql_create_table = """ CREATE TABLE IF NOT EXISTS Nmap_DHCP_Server(
scan_num INTEGER NOT NULL,
dhcp_server TEXT NOT NULL
); """
sql.execute(sql_create_table)
sql_connection.commit()
# Flush Table
sql.execute("DELETE FROM Nmap_DHCP_Server")
sql_connection.commit()
# Execute 15 probes and insert in list
dhcp_probes = 15
dhcp_server_list = []
dhcp_server_list.append(strftime("%Y-%m-%d %H:%M:%S"))
for _ in range(dhcp_probes):
stream = os.popen('sudo nmap --script broadcast-dhcp-discover 2>/dev/null | grep "Server Identifier" | awk \'{ print $4 }\'')
output = stream.read()
# dhcp_server_list.append(output.replace("\n", ""))
multiple_dhcp_ips = output.split("\n")
if multiple_dhcp_ips:
dhcp_server_list.append(multiple_dhcp_ips[0])
for multiple_dhcp in multiple_dhcp_ips[1:]:
if len(multiple_dhcp) >= 7:
dhcp_server_list.append(multiple_dhcp)
for i in range(len(dhcp_server_list)):
# Insert list in database
sqlite_insert = """INSERT INTO Nmap_DHCP_Server
(scan_num, dhcp_server)
VALUES (?, ?);"""
table_data = (i, dhcp_server_list[i])
sql.execute(sqlite_insert, table_data)
sql_connection.commit()
rogue_dhcp_notification()
# -----------------------------------------------------------------------------------
def rogue_dhcp_notification():
sql.execute("SELECT DISTINCT dhcp_server FROM Nmap_DHCP_Server")
rows = sql.fetchall()
rogue_dhcp_server_list = []
if len(rows) == 1:
print(' No DHCP Server detected.')
if len(rows) == 2:
if validate_dhcp_address(rows[1][0]):
if rows[1][0] == DHCP_SERVER_ADDRESS :
print(' One DHCP Server detected......: ' + rows[1][0] + ' (valid)')
else:
print(' One DHCP Server detected......: ' + rows[1][0] + ' (invalid)')
rogue_dhcp_server_list.append(rows[1][0])
else:
print(' Detection Error')
if len(rows) > 2:
print(' Multiple DHCP Servers detected:')
for i in range(1,len(rows),1):
if validate_dhcp_address(rows[i][0]):
if rows[i][0] == DHCP_SERVER_ADDRESS :
print(' ' + rows[i][0] + ' (valid)' )
else:
print(' ' + rows[i][0] + ' (rogue)' )
rogue_dhcp_server_list.append(rows[i][0])
else:
print(' Detection Error')
rogue_dhcp_reports = glob.glob(REPORTPATH_WEBGUI + "*Rogue DHCP Server*.txt")
if rogue_dhcp_server_list and not rogue_dhcp_reports:
rogue_dhcp_server_string = "Report Date: " + rows[0][0] + "\nServer: " + socket.gethostname() + "\n\nRogue DHCP Server\nDetected Server(s): "
rogue_dhcp_server_string += ', '.join(rogue_dhcp_server_list)
# Send Mail
sending_notifications ('rogue_dhcp', rogue_dhcp_server_string, rogue_dhcp_server_string)
#===============================================================================
# Services Monitoring
#===============================================================================
def set_service_update(_mon_URL, _mon_lastScan, _mon_lastStatus, _mon_lastLatence, _mon_TargetIP, _mon_Redirect, _mon_ssl_info, _mon_ssl_fc):
# SSL Info change
if len(_mon_ssl_info) == 4 :
_mon_ssl_subject = _mon_ssl_info['Subject']
_mon_ssl_issuer = _mon_ssl_info['Issuer']
_mon_ssl_valid_from = _mon_ssl_info['Valid_from']
_mon_ssl_valid_to = _mon_ssl_info['Valid_to']
else :
_mon_ssl_subject = ""
_mon_ssl_issuer = ""
_mon_ssl_valid_from = ""
_mon_ssl_valid_to = ""
ssl_fc = str(_mon_ssl_fc)
if _mon_Redirect != 200 and _mon_lastStatus == 200:
_mon_Redirect_Text = "Redirected by " + str(_mon_Redirect)
else:
_mon_Redirect_Text = ""
sqlite_insert = """UPDATE Services SET mon_LastScan=?, mon_LastStatus=?, mon_LastLatency=?, mon_TargetIP=?, mon_Notes=?, mon_ssl_subject=?, mon_ssl_issuer=?, mon_ssl_valid_from=?, mon_ssl_valid_to=?, mon_ssl_fc=? WHERE mon_URL=?;"""
table_data = (_mon_lastScan, _mon_lastStatus, _mon_lastLatence, _mon_TargetIP, _mon_Redirect_Text, _mon_ssl_subject, _mon_ssl_issuer, _mon_ssl_valid_from, _mon_ssl_valid_to, ssl_fc, _mon_URL)
sql.execute(sqlite_insert, table_data)
sql_connection.commit()
# -----------------------------------------------------------------------------
def set_services_events(_moneve_URL, _moneve_DateTime, _moneve_StatusCode, _moneve_Latency, _moneve_TargetIP, _moneve_ssl_fc):
sqlite_insert = """INSERT INTO Services_Events
(moneve_URL, moneve_DateTime, moneve_StatusCode, moneve_Latency, moneve_TargetIP, moneve_ssl_fc)
VALUES (?, ?, ?, ?, ?, ?);"""
table_data = (_moneve_URL, _moneve_DateTime, _moneve_StatusCode, _moneve_Latency, _moneve_TargetIP, _moneve_ssl_fc)
sql.execute(sqlite_insert, table_data)
sql_connection.commit()
# -----------------------------------------------------------------------------
def set_services_current_scan(_cur_URL, _cur_DateTime, _cur_StatusCode, _cur_Latency, _cur_TargetIP, _cur_ssl_info):
_cur_StatusChanged = 0
sql.execute("SELECT * FROM Services WHERE mon_URL = ?", [_cur_URL])
rows = sql.fetchall()
for row in rows:
_mon_AlertEvents = row[6]
_mon_AlertDown = row[7]
_mon_StatusCode = row[2]
_mon_Latency = row[3]
_mon_TargetIP = row[8]
_mon_ssl_subject = row[10] # FC value 8
_mon_ssl_issuer = row[11] # FC value 4
_mon_ssl_valid_from = row[12] # FC value 2
_mon_ssl_valid_to = row[13] # FC value 1
_mon_ssl_fc = row[14] # FC value between 0 and 15
# SSL Info change - Calc FC
if len(_cur_ssl_info) == 4:
_cur_ssl_fc = 0
if _cur_ssl_info['Subject'] != _mon_ssl_subject :
_cur_ssl_fc = _cur_ssl_fc + 8
_cur_ssl_subject = _cur_ssl_info['Subject']
if _cur_ssl_info['Issuer'] != _mon_ssl_issuer :
_cur_ssl_fc = _cur_ssl_fc + 4
_cur_ssl_issuer = _cur_ssl_info['Issuer']
if _cur_ssl_info['Valid_from'] != _mon_ssl_valid_from :
_cur_ssl_fc = _cur_ssl_fc + 2
_cur_ssl_valid_from = _cur_ssl_info['Valid_from']
if _cur_ssl_info['Valid_to'] != _mon_ssl_valid_to :
_cur_ssl_fc = _cur_ssl_fc + 1
_cur_ssl_valid_to = _cur_ssl_info['Valid_to']
else:
_cur_ssl_fc = 0
_cur_ssl_subject = ""
_cur_ssl_issuer = ""
_cur_ssl_valid_from = ""
_cur_ssl_valid_to = ""
# SSL Info change - Compare FC
if _cur_ssl_fc > 0:
_cur_StatusChanged += 1
# IP or Status Code change
if _mon_TargetIP != _cur_TargetIP or _mon_StatusCode != _cur_StatusCode:
_cur_StatusChanged += 1
# Down or Online
if _mon_Latency == "99999999" and _mon_Latency != _cur_Latency:
_cur_LatencyChanged = 0
_cur_StatusChanged += 1
elif _cur_Latency == "99999999" and _mon_Latency != _cur_Latency:
_cur_LatencyChanged = 1
else:
_cur_LatencyChanged = 0
# Merge Changes from all Events to 1 or 0
StatusChanged = 1 if _cur_StatusChanged > 0 else 0
sqlite_insert = """INSERT INTO Services_CurrentScan
(cur_URL, cur_DateTime, cur_StatusCode, cur_Latency, cur_AlertEvents, cur_AlertDown, cur_StatusChanged, cur_LatencyChanged, cur_TargetIP, cur_StatusCode_prev, cur_TargetIP_prev, cur_ssl_subject, cur_ssl_issuer, cur_ssl_valid_from, cur_ssl_valid_to, cur_ssl_fc)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
table_data = (_cur_URL, _cur_DateTime, _cur_StatusCode, _cur_Latency, _mon_AlertEvents, _mon_AlertDown, StatusChanged, _cur_LatencyChanged, _cur_TargetIP, _mon_StatusCode, _mon_TargetIP, _cur_ssl_subject, _cur_ssl_issuer, _cur_ssl_valid_from, _cur_ssl_valid_to, _cur_ssl_fc)
sql.execute(sqlite_insert, table_data)
sql_connection.commit()
return _cur_ssl_fc
# -----------------------------------------------------------------------------
def service_monitoring_log(site, status, latency):
status_str = str(status)
# Log status message to log file
with open(PIALERT_WEBSERVICES_LOG, 'a') as monitor_logfile:
monitor_logfile.write("{} | {} | {} | {}\n".format(strftime("%Y-%m-%d %H:%M:%S"),
status_str.zfill(3),
latency,
site
)
)
# -----------------------------------------------------------------------------
def check_services_health(site):
# Enable self signed SSL / no warning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
try:
resp = requests.get(site, verify=False, timeout=10)
latency = resp.elapsed
latency_str = str(latency)
latency_str_seconds = latency_str.split(":")
format_latency_str = latency_str_seconds[2]
if format_latency_str[0] == "0" and format_latency_str[1] != "." :
format_latency_str = format_latency_str[1:]
return resp.status_code, format_latency_str
except requests.exceptions.SSLError:
pass
except:
# Latency for offline services
latency = "99999999"
# HTTP Status Code for offline services
return 0, latency
# -----------------------------------------------------------------------------
def check_services_redirect(site):
# Enable self signed SSL
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
try:
resp = requests.get(site, verify=False, timeout=10, allow_redirects=False)
return resp.status_code
except requests.exceptions.SSLError:
pass
except:
# HTTP Status Code for offline services
return 0
# -----------------------------------------------------------------------------
def get_ssl_cert_info(url, timeout=10):
try:
parsed_url = urlparse(url)
hostname = parsed_url.hostname
port = parsed_url.port or 443
socket.setdefaulttimeout(timeout)
#with socket.create_connection((hostname, 443)) as sock:
with socket.create_connection((hostname, port)) as sock:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE # Disable certificate verification
with context.wrap_socket(sock, server_hostname=hostname, do_handshake_on_connect=False) as ssock:
ssock.do_handshake() # Perform the SSL handshake
cert_data = ssock.getpeercert(binary_form=True)
cert = x509.load_der_x509_certificate(cert_data, default_backend())
ssl_info = dict();
ssl_info['Subject'] = f"""{cert.subject}"""
ssl_info['Issuer'] = f"""{cert.issuer}"""
ssl_info['Valid_from'] = f"""{cert.not_valid_before}"""
ssl_info['Valid_to'] = f"""{cert.not_valid_after}"""
return ssl_info
except socket.timeout:
return "SSL certificate could not be found (Timeout)"
except socket.gaierror:
return "SSL certificate could not be found (Host down or does not exists)"
# return 0
except ConnectionRefusedError:
return "SSL certificate could not be found (Connection Refused)"
# return 0
except Exception as e:
return "SSL certificate could not be found (General Error)"
# print(e)
# -----------------------------------------------------------------------------
def get_services_list():
with open(PIALERT_WEBSERVICES_LOG, 'a') as monitor_logfile:
monitor_logfile.write(" Get Services List\n")
monitor_logfile.close()
sql.execute("SELECT mon_URL FROM Services")
rows = sql.fetchall()
return [row[0] for row in rows]
# -----------------------------------------------------------------------------
def flush_services_current_scan():
with open(PIALERT_WEBSERVICES_LOG, 'a') as monitor_logfile:
monitor_logfile.write(" Flush previous scan results\n")
monitor_logfile.close()
sql.execute("DELETE FROM Services_CurrentScan")
sql_connection.commit()
# -----------------------------------------------------------------------------
def print_service_monitoring_changes():
print(" Services Monitoring Changes...")
changedStatusCode = sql.execute("SELECT COUNT() FROM Services_CurrentScan WHERE cur_StatusChanged = 1").fetchone()[0]
print(" Changed StatusCodes.....:", str(changedStatusCode))
changedLatency = sql.execute("SELECT COUNT() FROM Services_CurrentScan WHERE cur_LatencyChanged = 1").fetchone()[0]
print(" Changed Reachability....:", str(changedLatency))
with open(PIALERT_WEBSERVICES_LOG, 'a') as monitor_logfile:
monitor_logfile.write("\nServices Monitoring Changes:\n")
monitor_logfile.write(" Changed StatusCodes.....: " + str(changedStatusCode))
monitor_logfile.write("\n Changed Reachability....: " + str(changedLatency))
monitor_logfile.write("\n")
monitor_logfile.close()
# -----------------------------------------------------------------------------
def service_monitoring_notification():
global mail_text_webservice
global mail_html_webservice
# Reporting section
print('\nReporting (Web Services) ...')
# Open Templates
with open(f'{PIALERT_BACK_PATH}/report_template_webservice.txt', 'r') as template_file:
mail_text_webservice = template_file.read()
with open(f'{PIALERT_BACK_PATH}/report_template_webservice.html', 'r') as template_file:
mail_html_webservice = template_file.read()
# Report Header & footer
timeFormated = startTime.strftime ('%Y-%m-%d %H:%M')
mail_text_webservice = mail_text_webservice.replace ('', timeFormated)
mail_html_webservice = mail_html_webservice.replace ('', timeFormated)
mail_text_webservice = mail_text_webservice.replace ('', socket.gethostname() )
mail_html_webservice = mail_html_webservice.replace ('', socket.gethostname() )
# Compose Devices Down Section
mail_section_services_down = False
mail_text_services_down = ''
mail_html_services_down = ''
text_line_template = '{}{}\n\t{}\t\t\t{}\n\t{}\t\t\t{}\n\t{}\t{}\n\t{}\t{}\n\n'
html_line_template = 'URL: {} | Tag: {} |
\n'+ \
'ScanTime: {} | IP: {} | prev. IP: {} |
\n'
sql.execute ("""SELECT Services_CurrentScan.*, Services.mon_tags
FROM Services_CurrentScan
JOIN Services ON Services_CurrentScan.cur_URL = Services.mon_URL
WHERE Services_CurrentScan.cur_AlertDown = 1
AND Services_CurrentScan.cur_LatencyChanged = 1
ORDER BY Services_CurrentScan.cur_DateTime""")
for eventAlert in sql :
if eventAlert['cur_TargetIP'] == '':
_func_cur_TargetIP = 'n.a.'
else:
_func_cur_TargetIP = eventAlert['cur_TargetIP']
if eventAlert['cur_TargetIP_prev'] == '':
_func_cur_TargetIP_prev = 'n.a.'
else:
_func_cur_TargetIP_prev = eventAlert['cur_TargetIP_prev']
mail_section_services_down = True
mail_text_services_down += text_line_template.format (
'Service: ', eventAlert['cur_URL'],
'Tag: ', eventAlert['mon_tags'],
'Time: ', eventAlert['cur_DateTime'],
'Destination IP: ', _func_cur_TargetIP,
'prev. Destination IP: ', _func_cur_TargetIP_prev)
mail_html_services_down += html_line_template.format (
eventAlert['cur_URL'], eventAlert['mon_tags'], eventAlert['cur_DateTime'], _func_cur_TargetIP, _func_cur_TargetIP_prev)
format_report_section_services (mail_section_services_down, 'SECTION_DEVICES_DOWN',
'TABLE_DEVICES_DOWN', mail_text_services_down, mail_html_services_down)
# Compose Events Section (includes Down as an Event)
mail_section_events = False
mail_text_events = ''
mail_html_events = ''
text_line_template = '{}{}\n\t{}\t\t\t{}\n\t{}\t\t\t{}\n\t{}\t{}\n\t{}\t{}\n\t{}\t{}\n\t{}{}\n\t{}\t\t{}\n\n'
html_line_template = 'URL: {} | Tag: {} |
\n'+ \
'ScanTime: {} | IP: {} | prev. IP: {} | Latency: {} |
\n'+ \
'
| StatusCode: {} | prev. StatusCode: {} | SSL Code: {} |
\n'
sql.execute ("""SELECT Services_CurrentScan.*, Services.mon_tags
FROM Services_CurrentScan
JOIN Services ON Services_CurrentScan.cur_URL = Services.mon_URL
WHERE Services_CurrentScan.cur_AlertEvents = 1
AND Services_CurrentScan.cur_StatusChanged = 1
ORDER BY Services_CurrentScan.cur_DateTime""")
for eventAlert in sql :
if eventAlert['cur_TargetIP'] == '':
_func_cur_TargetIP = 'n.a.'
else:
_func_cur_TargetIP = eventAlert['cur_TargetIP']
if eventAlert['cur_TargetIP_prev'] == '':
_func_cur_TargetIP_prev = 'n.a.'
else:
_func_cur_TargetIP_prev = eventAlert['cur_TargetIP_prev']
mail_section_events = True
mail_text_events += text_line_template.format (
'Service: ', eventAlert['cur_URL'],
'Tag: ', eventAlert['mon_tags'],
'Time: ', eventAlert['cur_DateTime'],
'Destination IP: ', _func_cur_TargetIP,
'prev. Destination IP: ', _func_cur_TargetIP_prev,
'HTTP Status Code: ', eventAlert['cur_StatusCode'],
'prev. HTTP Status Code: ', eventAlert['cur_StatusCode_prev'],
'SSL Status: ', eventAlert['cur_ssl_fc'])
mail_html_events += html_line_template.format (
eventAlert['cur_URL'], eventAlert['mon_tags'],
eventAlert['cur_DateTime'], _func_cur_TargetIP, _func_cur_TargetIP_prev, eventAlert['cur_Latency'],
eventAlert['cur_StatusCode'], eventAlert['cur_StatusCode_prev'], eventAlert['cur_ssl_fc'])
format_report_section_services (mail_section_events, 'SECTION_EVENTS',
'TABLE_EVENTS', mail_text_events, mail_html_events)
# # Send Mail
if mail_section_services_down == True or mail_section_events == True :
sending_notifications ('webservice', mail_html_webservice, mail_text_webservice)
else :
print(' No changes to report...')
sql_connection.commit()
# -----------------------------------------------------------------------------
def service_monitoring():
global VERSION
global VERSION_DATE
# Empty Log and write new header
print("\nStart Services Monitoring...")
print(" Prepare Logfile...")
with open(PIALERT_WEBSERVICES_LOG, 'w') as monitor_logfile:
monitor_logfile.write("\nPi.Alert v" + VERSION_DATE + ":\n---------------------------------------------------------\n")
monitor_logfile.write("Current User: %s \n\n" % get_username())
monitor_logfile.write("Monitor Web-Services\n")
monitor_logfile.write(" Timestamp: " + strftime("%Y-%m-%d %H:%M:%S") + "\n")
monitor_logfile.close()
print(" Get Services List...")
sites = get_services_list()
print(" Flush previous scan results...")
flush_services_current_scan()
print(" Check Services...")
with open(PIALERT_WEBSERVICES_LOG, 'a') as monitor_logfile:
monitor_logfile.write("\nStart Services Monitoring\n\n Timestamp | StatusCode | ResponseTime | URL \n-----------------------------------------------------------------\n")
monitor_logfile.close()
scantime = startTime.strftime("%Y-%m-%d %H:%M")
while sites:
for site in sites:
status,latency = check_services_health(site)
site_retry = ''
if latency == "99999999" :
# 2nd Retry if the first attempt fails
status,latency = check_services_health(site)
site_retry = '*'
if latency == "99999999" :
# 3rd Retry if the second attempt fails
status,latency = check_services_health(site)
site_retry = '**'
#Get IP from Domain
if latency != "99999999":
redirect_state = check_services_redirect(site)
domain = urlparse(site).netloc
domain = domain.split(":")[0]
domain_ip = socket.gethostbyname(domain)
# get SSL info
ssl_info = get_ssl_cert_info(site)
#print(ssl_info)
else:
domain_ip = ""
redirect_state = ""
ssl_info = ""
service_monitoring_log(site + ' ' + site_retry, status, latency)
ssl_fc = set_services_current_scan(site, scantime, status, latency, domain_ip, ssl_info)
set_services_events(site, scantime, status, latency, domain_ip, ssl_fc)
# set_services_current_scan(site, scantime, status, latency, domain_ip, ssl_info)
sys.stdout.flush()
set_service_update(site, scantime, status, latency, domain_ip, redirect_state, ssl_info, ssl_fc)
break
else:
print(" No site(s) to monitor!")
with open(PIALERT_WEBSERVICES_LOG, 'a') as monitor_logfile:
monitor_logfile.write("\n**************** No site(s) to monitor!! ****************\n")
monitor_logfile.close()
# Print to log file
print_service_monitoring_changes()
#===============================================================================
# ICMP Monitoring
#===============================================================================
def icmp_monitoring():
print("\nStart ICMP Monitoring...")
print(" Get Host/Domain List...")
icmphosts = get_icmphost_list()
icmphostscount = len(icmphosts)
print(" List contains " + str(icmphostscount) + " entries")
print(" Flush previous ping results...")
flush_icmphost_current_scan()
print(" Ping Hosts...")
closeDB()
scantime = startTime.strftime("%Y-%m-%d %H:%M")
icmp_scan_results = {}
icmphosts_all = len(icmphosts)
icmphosts_online = 0
icmphosts_offline = 0
try:
ping_retries = ICMP_ONLINE_TEST
except NameError: # variable not defined, use a default
ping_retries = 1 # 1
icmphosts_index = 0
if icmphosts_all > 0 :
while icmphosts_index < icmphosts_all:
host_ip = icmphosts[icmphosts_index]
for i in range(ping_retries):
# print("Host %s retry %s" % (host_ip, str(i+1)))
icmp_status = ping(host_ip)
if icmp_status == "1":
break;
if icmp_status == "1":
icmp_rtt = ping_avg(host_ip)
# print("Host %s RTT %s" % (host_ip, str(icmp_rtt)))
icmphosts_online+=1
else:
icmp_rtt = "99999"
icmphosts_offline+=1
current_data = {
"host_ip": host_ip,
"scantime": scantime,
"icmp_status": icmp_status,
"icmp_rtt": icmp_rtt
}
icmp_scan_results[host_ip] = current_data
sys.stdout.flush()
icmphosts_index += 1
print(" Online Host(s) : " + str(icmphosts_online))
print(" Offline Host(s) : " + str(icmphosts_offline))
openDB()
# Save Scan Results
icmp_save_scandata(icmp_scan_results)
print(" Calculate Activity History...")
calc_activity_history_icmp(icmphosts_online, icmphosts_offline)
else:
openDB()
print(" No Hosts(s) to monitor!")
# -----------------------------------------------------------------------------
def icmp_save_scandata(data):
print(" Save scan results...")
for host_ip, scan_data in data.items():
#print(f"Host IP: {host_ip}")
#print(f"ICMP Status: {scan_data['icmp_status']}")
set_icmphost_events(host_ip, scan_data['scantime'], scan_data['icmp_status'], scan_data['icmp_rtt'])
set_icmphost_current_scan(host_ip, scan_data['scantime'], scan_data['icmp_status'], scan_data['icmp_rtt'])
set_icmphost_update(host_ip, scan_data['scantime'], scan_data['icmp_status'], scan_data['icmp_rtt'])
# -----------------------------------------------------------------------------
def get_icmphost_list():
sql.execute("SELECT icmp_ip FROM ICMP_Mon WHERE icmp_Archived = 0 ")
rows = sql.fetchall()
return [row[0] for row in rows]
# -----------------------------------------------------------------------------
def ping(host):
command = ['ping', '-c', '1', host]
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
output = result.stdout.decode('utf8')
if "Request timed out." in output or "100% packet loss" in output:
return "0"
return "1"
# -----------------------------------------------------------------------------
def ping_avg(host):
try:
ping_count = str(ICMP_GET_AVG_RTT)
except NameError: # variable not defined, use a default
ping_count = str(2) # 1
command = ['ping', '-c', ping_count, host]
ping_process = subprocess.Popen(command, stdout=subprocess.PIPE)
tail_process = subprocess.Popen(['tail', '-1'], stdin=ping_process.stdout, stdout=subprocess.PIPE)
awk_process = subprocess.Popen(['awk', '-F/', '{print $5}'], stdin=tail_process.stdout, stdout=subprocess.PIPE)
output, error = awk_process.communicate()
return output.decode('utf-8').strip()
# -----------------------------------------------------------------------------
def set_icmphost_events(_icmpeve_ip, _icmpeve_DateTime, _icmpeve_Present, _icmpeve_avgrtt):
#print(_icmpeve_ip, _icmpeve_DateTime, _icmpeve_Present, _icmpeve_avgrtt)
sqlite_insert = """INSERT INTO ICMP_Mon_Events
(icmpeve_ip, icmpeve_DateTime, icmpeve_Present, icmpeve_avgrtt)
VALUES (?, ?, ?, ?);"""
table_data = (_icmpeve_ip, _icmpeve_DateTime, _icmpeve_Present, _icmpeve_avgrtt)
sql.execute(sqlite_insert, table_data)
sql_connection.commit()
# -----------------------------------------------------------------------------
def set_icmphost_current_scan(_cur_ip, _cur_DateTime, _cur_Present, _cur_avgrrt):
sql.execute("SELECT * FROM ICMP_Mon WHERE icmp_ip = ?", [_cur_ip])
rows = sql.fetchall()
for row in rows:
_icmp_PresentLastScan = row[3]
_icmp_AlertEvents = row[5]
_icmp_AlertDown = row[6]
if str(_icmp_PresentLastScan) != str(_cur_Present):
_cur_PresentChanged = 1
else:
_cur_PresentChanged = 0
sqlite_insert = """INSERT INTO ICMP_Mon_CurrentScan
(cur_ip, cur_LastScan, cur_Present, cur_PresentChanged, cur_avgrrt, cur_AlertEvents, cur_AlertDown)
VALUES (?, ?, ?, ?, ?, ?, ?);"""
table_data = (_cur_ip, _cur_DateTime, _cur_Present, _cur_PresentChanged, _cur_avgrrt, _icmp_AlertEvents, _icmp_AlertDown)
sql.execute(sqlite_insert, table_data)
sql_connection.commit()
# -----------------------------------------------------------------------------
def set_icmphost_update(_icmp_ip, _icmp_LastScan, _icmp_PresentLastScan, _icmp_avgrtt):
sqlite_insert = """UPDATE ICMP_Mon SET icmp_LastScan=?, icmp_PresentLastScan=?, icmp_avgrtt=? WHERE icmp_ip=?;"""
table_data = (_icmp_LastScan, _icmp_PresentLastScan, _icmp_avgrtt, _icmp_ip)
sql.execute(sqlite_insert, table_data)
sql_connection.commit()
# -----------------------------------------------------------------------------
def flush_icmphost_current_scan():
sql.execute("DELETE FROM ICMP_Mon_CurrentScan")
sql_connection.commit()
# -----------------------------------------------------------------------------
def get_icmphost_name(_icmp_ip):
query = "SELECT icmp_hostname FROM ICMP_Mon WHERE icmp_ip = ?"
sql.execute(query, (_icmp_ip,))
result_hostname = sql.fetchone()
if result_hostname:
hostname = result_hostname[0]
else:
hostname = 'No Hostname set'
return hostname
# -----------------------------------------------------------------------------
def calc_activity_history_icmp(History_Online_Devices, History_Offline_Devices):
sql.execute("SELECT * FROM ICMP_Mon WHERE icmp_Archived = 1")
Querry_Archived_Devices = sql.fetchall()
History_Archived_Devices = len(Querry_Archived_Devices)
History_ALL_Devices = History_Online_Devices + History_Offline_Devices + History_Archived_Devices
sql.execute ("INSERT INTO Online_History (Scan_Date, Online_Devices, Down_Devices, All_Devices, Archived_Devices, Data_Source) "+
"VALUES ( ?, ?, ?, ?, ?, ?)", (startTime, History_Online_Devices, History_Offline_Devices, History_ALL_Devices, History_Archived_Devices, 'icmp_scan') )
sql_connection.commit()
# -----------------------------------------------------------------------------
def icmphost_monitoring_notification():
global mail_text_icmphost
global mail_html_icmphost
# Reporting section
print('\nReporting (ICMP Monitoring) ...')
# Open text Templates
with open(f'{PIALERT_BACK_PATH}/report_template_icmpmon.txt', 'r') as template_file:
mail_text_icmphost = template_file.read()
with open(f'{PIALERT_BACK_PATH}/report_template_icmpmon.html', 'r') as template_file:
mail_html_icmphost = template_file.read()
# Report Header & footer
timeFormated = startTime.strftime ('%Y-%m-%d %H:%M')
mail_text_icmphost = mail_text_icmphost.replace ('', timeFormated)
mail_html_icmphost = mail_html_icmphost.replace ('', timeFormated)
mail_text_icmphost = mail_text_icmphost.replace ('', socket.gethostname() )
mail_html_icmphost = mail_html_icmphost.replace ('', socket.gethostname() )
# Compose Devices Down Section
mail_section_icmphost_down = False
mail_text_icmphost_down = ''
mail_html_icmphost_down = ''
text_line_template = '{}{}\n\t{}\t{}\n\t{}\t\t{}\n\t{}\t{}\n\n'
html_line_template = '\n'+ \
' {} | \n {} | {} | \n'+ \
' {} | \n
\n'
sql.execute ("""SELECT * FROM ICMP_Mon_CurrentScan
WHERE cur_AlertDown = 1 AND cur_Present = 0 AND cur_PresentChanged = 1
ORDER BY cur_LastScan""")
for eventAlert in sql :
hostname = get_icmphost_name(eventAlert['cur_ip'])
print(hostname)
mail_section_icmphost_down = True
mail_text_icmphost_down += text_line_template.format (
'IP: ', eventAlert['cur_ip'],
'Hostname: ', hostname,
'Time: ', eventAlert['cur_LastScan'],
'Status: ', 'Down')
mail_html_icmphost_down += html_line_template.format (
eventAlert['cur_ip'], hostname, eventAlert['cur_LastScan'], 'Down')
format_report_section_icmp (mail_section_icmphost_down, 'SECTION_DEVICES_DOWN',
'TABLE_DEVICES_DOWN', mail_text_icmphost_down, mail_html_icmphost_down)
# Compose Events Section (includes Down as an Event)
mail_section_events = False
mail_text_events = ''
mail_html_events = ''
text_line_template = '{}{}\n\t{}\t{}\n\t{}\t\t{}\n\t{}\t\t{} ms\n\t{}\t{}\n\n'
html_line_template = '\n '+ \
' {} | \n {} | {} | \n'+ \
' {} | \n {} | \n'+ \
'
\n'
sql.execute ("""SELECT * FROM ICMP_Mon_CurrentScan
WHERE cur_AlertEvents = 1 AND cur_PresentChanged = 1
ORDER BY cur_LastScan""")
for eventAlert in sql :
mail_section_events = True
hostname = get_icmphost_name(eventAlert['cur_ip'])
print(hostname)
if eventAlert['cur_Present'] == 1 :
icmp_online_status = 'Up'
else :
icmp_online_status = 'Down'
mail_text_events += text_line_template.format (
'IP: ', eventAlert['cur_ip'],
'Hostname:', hostname,
'Time: ', eventAlert['cur_LastScan'],
'RTT: ', eventAlert['cur_avgrrt'],
'Status: ', icmp_online_status)
mail_html_events += html_line_template.format (
eventAlert['cur_ip'], hostname, eventAlert['cur_LastScan'], eventAlert['cur_avgrrt'], icmp_online_status)
format_report_section_icmp (mail_section_events, 'SECTION_EVENTS',
'TABLE_EVENTS', mail_text_events, mail_html_events)
# # Send Mail
if mail_section_icmphost_down == True or mail_section_events == True :
sending_notifications ('icmp_mon', mail_html_icmphost, mail_text_icmphost)
else :
print(' No changes to report...')
sql_connection.commit()
#===============================================================================
# REPORTING
#===============================================================================
def email_reporting():
global mail_text
global mail_html
# Reporting section
print('\nReporting...')
openDB()
# Disable reporting on events for devices where reporting is disabled based on the MAC address
sql.execute ("""UPDATE Events SET eve_PendingAlertEmail = 0
WHERE eve_PendingAlertEmail = 1 AND eve_EventType != 'Device Down' AND eve_MAC IN
(
SELECT dev_MAC FROM Devices WHERE dev_AlertEvents = 0
)""")
sql.execute ("""UPDATE Events SET eve_PendingAlertEmail = 0
WHERE eve_PendingAlertEmail = 1 AND eve_EventType = 'Device Down' AND eve_MAC IN
(
SELECT dev_MAC FROM Devices WHERE dev_AlertDeviceDown = 0
)""")
# Open text Templates
with open(f'{PIALERT_BACK_PATH}/report_template.txt', 'r') as template_file:
mail_text = template_file.read()
with open(f'{PIALERT_BACK_PATH}/report_template.html', 'r') as template_file:
mail_html = template_file.read()
# Report Header & footer
timeFormated = startTime.strftime ('%Y-%m-%d %H:%M')
mail_text = mail_text.replace ('', timeFormated)
mail_html = mail_html.replace ('', timeFormated)
mail_text = mail_text.replace ('', socket.gethostname() )
mail_html = mail_html.replace ('', socket.gethostname() )
# Compose Internet Section
print(' Formating report...')
mail_section_Internet = False
mail_text_Internet = ''
mail_html_Internet = ''
text_line_template = '{} \t{}\t{}\t{}\n'
html_line_template = '\n'+ \
' {} | \n {} | \n'+ \
' {} | \n'+ \
' {} | \n
\n'
sql.execute ("""SELECT * FROM Events
WHERE eve_PendingAlertEmail = 1 AND eve_MAC = 'Internet'
ORDER BY eve_DateTime""")
for eventAlert in sql :
mail_section_Internet = True
mail_text_Internet += text_line_template.format (
eventAlert['eve_EventType'], eventAlert['eve_DateTime'],
eventAlert['eve_IP'], eventAlert['eve_AdditionalInfo'])
mail_html_Internet += html_line_template.format (
REPORT_DEVICE_URL, eventAlert['eve_MAC'],
eventAlert['eve_EventType'], eventAlert['eve_DateTime'],
eventAlert['eve_IP'], eventAlert['eve_AdditionalInfo'])
format_report_section (mail_section_Internet, 'SECTION_INTERNET',
'TABLE_INTERNET', mail_text_Internet, mail_html_Internet)
# Compose New Devices Section
mail_section_new_devices = False
mail_text_new_devices = ''
mail_html_new_devices = ''
text_line_template = '{}\t{}\n\t{}\t\t{}\n\t{}\t\t{}\n\t{}\t\t{}\n\t{}\t{}\n\n'
html_line_template = '\n'+ \
' {} | \n {} | \n'+\
' {} | \n {} | \n {} | \n
\n'
sql.execute ("""SELECT * FROM Events_Devices
WHERE eve_PendingAlertEmail = 1
AND eve_EventType = 'New Device'
ORDER BY eve_DateTime""")
for eventAlert in sql :
mail_section_new_devices = True
mail_text_new_devices += text_line_template.format (
'Name: ', eventAlert['dev_Name'], 'MAC: ', eventAlert['eve_MAC'], 'IP: ', eventAlert['eve_IP'],
'Time: ', eventAlert['eve_DateTime'], 'More Info: ', eventAlert['eve_AdditionalInfo'])
mail_html_new_devices += html_line_template.format (
REPORT_DEVICE_URL, eventAlert['eve_MAC'], eventAlert['eve_MAC'],
eventAlert['eve_DateTime'], eventAlert['eve_IP'],
eventAlert['dev_Name'], eventAlert['eve_AdditionalInfo'])
format_report_section (mail_section_new_devices, 'SECTION_NEW_DEVICES',
'TABLE_NEW_DEVICES', mail_text_new_devices, mail_html_new_devices)
# Compose Devices Down Section
mail_section_devices_down = False
mail_text_devices_down = ''
mail_html_devices_down = ''
text_line_template = '{}\t{}\n\t{}\t{}\n\t{}\t{}\n\t{}\t{}\n\n'
html_line_template = '\n'+ \
' {} | \n {} | \n'+ \
' {} | \n {} | \n
\n'
sql.execute ("""SELECT * FROM Events_Devices
WHERE eve_PendingAlertEmail = 1
AND eve_EventType = 'Device Down'
ORDER BY eve_DateTime""")
for eventAlert in sql :
mail_section_devices_down = True
mail_text_devices_down += text_line_template.format (
'Name: ', eventAlert['dev_Name'], 'MAC: ', eventAlert['eve_MAC'],
'Time: ', eventAlert['eve_DateTime'],'IP: ', eventAlert['eve_IP'])
mail_html_devices_down += html_line_template.format (
REPORT_DEVICE_URL, eventAlert['eve_MAC'], eventAlert['eve_MAC'],
eventAlert['eve_DateTime'], eventAlert['eve_IP'],
eventAlert['dev_Name'])
format_report_section (mail_section_devices_down, 'SECTION_DEVICES_DOWN',
'TABLE_DEVICES_DOWN', mail_text_devices_down, mail_html_devices_down)
# Compose Events Section
mail_section_events = False
mail_text_events = ''
mail_html_events = ''
text_line_template = '{}\t{}\n\t{}\t\t{}\n\t{}\t\t{}\n\t{}\t\t{}\n\t{}\t\t{}\n\t{}\t{}\n\n'
html_line_template = '\n '+ \
' {} | \n {} | \n'+ \
' {} | \n {} | \n {} | \n'+ \
' {} | \n
\n'
sql.execute ("""SELECT * FROM Events_Devices
WHERE eve_PendingAlertEmail = 1
AND eve_EventType IN ('Connected','Disconnected',
'IP Changed')
ORDER BY eve_DateTime""")
for eventAlert in sql :
mail_section_events = True
mail_text_events += text_line_template.format (
'Name: ', eventAlert['dev_Name'], 'MAC: ', eventAlert['eve_MAC'],
'IP: ', eventAlert['eve_IP'],'Time: ', eventAlert['eve_DateTime'],
'Event: ', eventAlert['eve_EventType'],'More Info: ', eventAlert['eve_AdditionalInfo'])
mail_html_events += html_line_template.format (
REPORT_DEVICE_URL, eventAlert['eve_MAC'], eventAlert['eve_MAC'],
eventAlert['eve_DateTime'], eventAlert['eve_IP'],
eventAlert['eve_EventType'], eventAlert['dev_Name'],
eventAlert['eve_AdditionalInfo'])
format_report_section (mail_section_events, 'SECTION_EVENTS',
'TABLE_EVENTS', mail_text_events, mail_html_events)
# Send Mail
if mail_section_Internet == True or mail_section_new_devices == True \
or mail_section_devices_down == True or mail_section_events == True :
# Send Mail
sending_notifications ('pialert', mail_html, mail_text)
else :
print(' No changes to report...')
# Clean Pending Alert Events
sql.execute ("""UPDATE Devices SET dev_LastNotification = ?
WHERE dev_MAC IN (SELECT eve_MAC FROM Events
WHERE eve_PendingAlertEmail = 1)
""", (datetime.datetime.now(),) )
sql.execute ("""UPDATE Events SET eve_PendingAlertEmail = 0
WHERE eve_PendingAlertEmail = 1""")
print(' Notifications:', sql.rowcount)
sql_connection.commit()
try:
enable_services_monitoring = SCAN_WEBSERVICES
except NameError:
enable_services_monitoring = False
if enable_services_monitoring == True:
if str(startTime)[15] == "0":
service_monitoring_notification()
try:
enable_icmp_monitoring = ICMPSCAN_ACTIVE
except NameError:
enable_icmp_monitoring = False
if enable_icmp_monitoring == True:
icmphost_monitoring_notification()
closeDB()
#-------------------------------------------------------------------------------
def send_pushsafer(_Text):
try:
notification_target = PUSHSAFER_DEVICE
except NameError:
notification_target = "a"
try:
result = PUSHSAFER_PRIO
except NameError:
PUSHSAFER_PRIO = 0
try:
notification_sound = PUSHSAFER_SOUND
except NameError:
notification_sound = 22
# Remove one linebrake between "Server" and the headline of the event type
_pushsafer_Text = _Text.replace('\n\n\n', '\n\n')
# extract event type headline to use it in the notification headline
findsubheadline = _pushsafer_Text.split('\n')
subheadline = findsubheadline[3]
url = 'https://www.pushsafer.com/api'
post_fields = {
"t" : 'Pi.Alert Message - '+subheadline,
"m" : _pushsafer_Text,
"s" : notification_sound,
"v" : 3,
"i" : 148,
"c" : '#ef7f7f',
"d" : notification_target,
"u" : REPORT_DASHBOARD_URL,
"ut" : 'Open Pi.Alert',
"k" : PUSHSAFER_TOKEN,
"pr" : PUSHSAFER_PRIO,
}
requests.post(url, data=post_fields)
#-------------------------------------------------------------------------------
def send_pushover (_Text):
# Remove one linebrake between "Server" and the headline of the event type
_pushover_Text = _Text.replace('\n\n\n', '\n\n')
# Text-layout tweak
_pushover_Text = _pushover_Text.replace('IP: \t\t', 'IP: \t\t\t')
# extract event type headline to use it in the notification headline
findsubheadline = _pushover_Text.split('\n')
subheadline = findsubheadline[3]
try:
result = PUSHOVER_PRIO
except NameError:
PUSHOVER_PRIO = 0
try:
notification_sound = PUSHOVER_SOUND
except NameError:
notification_sound = 'siren'
url = 'https://api.pushover.net/1/messages.json'
post_fields = {
"token": PUSHOVER_TOKEN,
"user": PUSHOVER_USER,
"title" : 'Pi.Alert Message - '+subheadline,
"message" : _pushover_Text,
"priority" : PUSHOVER_PRIO,
"sound" : notification_sound,
}
requests.post(url, data=post_fields)
#-------------------------------------------------------------------------------
def send_ntfy (_Text):
# Prepare header
headers = {
"Title": "Pi.Alert Notification",
"Priority": NTFY_PRIORITY,
"Tags": "warning"
}
if NTFY_CLICKABLE == True:
headers["Click"] = REPORT_DASHBOARD_URL
# if username and password are set generate hash and update header
if NTFY_USER != "" and NTFY_PASSWORD != "":
# Generate hash for basic auth
usernamepassword = "{}:{}".format(NTFY_USER,NTFY_PASSWORD)
basichash = b64encode(bytes(NTFY_USER + ':' + NTFY_PASSWORD, "utf-8")).decode("ascii")
# add authorization header with hash
headers["Authorization"] = "Basic {}".format(basichash)
requests.post("{}/{}".format( NTFY_HOST, NTFY_TOPIC),
data=_Text,
headers=headers)
#-------------------------------------------------------------------------------
def send_telegram (_Text):
# Remove one linebrake between "Server" and the headline of the event type
_telegram_Text = _Text.replace('\n\n\n', '\n\n')
# extract event type headline to use it in the notification headline
findsubheadline = _telegram_Text.split('\n')
subheadline = findsubheadline[3]
runningpath = os.path.abspath(os.path.dirname(__file__))
stream = os.popen(runningpath+'/shoutrrr/'+SHOUTRRR_BINARY+'/shoutrrr send --url "'+TELEGRAM_BOT_TOKEN_URL+'" --message "'+_telegram_Text+'" --title "Pi.Alert - '+subheadline+'"')
#-------------------------------------------------------------------------------
def send_webgui (_Text):
# Remove one linebrake between "Server" and the headline of the event type
_webgui_Text = _Text.replace('\n\n\n', '\n\n')
# extract event type headline to use it in the notification headline
findsubheadline = _webgui_Text.split('\n')
subheadline = findsubheadline[3]
_webgui_filename = time.strftime("%Y%m%d-%H%M%S") + "_" + subheadline + ".txt"
if (os.path.exists(REPORTPATH_WEBGUI + _webgui_filename) == False):
f = open(REPORTPATH_WEBGUI + _webgui_filename, "w")
f.write(_webgui_Text)
f.close()
set_reports_file_permissions()
#===============================================================================
# Sending Notifications
#===============================================================================
def sending_notifications (_type, _html_text, _txt_text):
if _type in ['webservice']:
if REPORT_MAIL_WEBMON :
print(' Sending report by email...')
send_email (_txt_text, _html_text)
else :
print(' Skip mail...')
if REPORT_PUSHSAFER_WEBMON :
print(' Sending report by PUSHSAFER...')
send_pushsafer (_txt_text)
else :
print(' Skip PUSHSAFER...')
if REPORT_PUSHOVER_WEBMON :
print(' Sending report by PUSHOVER...')
send_pushover (_txt_text)
else :
print(' Skip PUSHOVER...')
if REPORT_TELEGRAM_WEBMON :
print(' Sending report by Telegram...')
send_telegram (_txt_text)
else :
print(' Skip Telegram...')
if REPORT_NTFY_WEBMON :
print(' Sending report by NTFY...')
send_ntfy (_txt_text)
else :
print(' Skip NTFY...')
if REPORT_WEBGUI_WEBMON :
print(' Save report to file...')
send_webgui (_txt_text)
else :
print(' Skip WebUI...')
elif _type in ['pialert', 'icmp_mon', 'rogue_dhcp']:
if REPORT_MAIL :
print(' Sending report by email...')
send_email (_txt_text, _html_text)
else :
print(' Skip mail...')
if REPORT_PUSHSAFER :
print(' Sending report by PUSHSAFER...')
send_pushsafer (_txt_text)
else :
print(' Skip PUSHSAFER...')
if REPORT_PUSHOVER :
print(' Sending report by PUSHOVER...')
send_pushover (_txt_text)
else :
print(' Skip PUSHOVER...')
if REPORT_TELEGRAM :
print(' Sending report by Telegram...')
send_telegram (_txt_text)
else :
print(' Skip Telegram...')
if REPORT_NTFY :
print(' Sending report by NTFY...')
send_ntfy (_txt_text)
else :
print(' Skip NTFY...')
if REPORT_WEBGUI :
print(' Save report to file...')
send_webgui (_txt_text)
else :
print(' Skip WebUI...')
#-------------------------------------------------------------------------------
def format_report_section (pActive, pSection, pTable, pText, pHTML):
global mail_text
global mail_html
# Replace section text
if pActive :
mail_text = mail_text.replace ('<'+ pTable +'>', pText)
mail_html = mail_html.replace ('<'+ pTable +'>', pHTML)
mail_text = remove_tag (mail_text, pSection)
mail_html = remove_tag (mail_html, pSection)
else:
mail_text = remove_section (mail_text, pSection)
mail_html = remove_section (mail_html, pSection)
#-------------------------------------------------------------------------------
def format_report_section_services (pActive, pSection, pTable, pText, pHTML):
global mail_text_webservice
global mail_html_webservice
# Replace section text
if pActive :
mail_text_webservice = mail_text_webservice.replace ('<'+ pTable +'>', pText)
mail_html_webservice = mail_html_webservice.replace ('<'+ pTable +'>', pHTML)
mail_text_webservice = remove_tag (mail_text_webservice, pSection)
mail_html_webservice = remove_tag (mail_html_webservice, pSection)
else:
mail_text_webservice = remove_section (mail_text_webservice, pSection)
mail_html_webservice = remove_section (mail_html_webservice, pSection)
#-------------------------------------------------------------------------------
def format_report_section_icmp (pActive, pSection, pTable, pText, pHTML):
global mail_html_icmphost
global mail_text_icmphost
# Replace section text
if pActive :
mail_text_icmphost = mail_text_icmphost.replace ('<'+ pTable +'>', pText)
mail_html_icmphost = mail_html_icmphost.replace ('<'+ pTable +'>', pHTML)
mail_text_icmphost = remove_tag (mail_text_icmphost, pSection)
mail_html_icmphost = remove_tag (mail_html_icmphost, pSection)
else:
mail_text_icmphost = remove_section (mail_text_icmphost, pSection)
mail_html_icmphost = remove_section (mail_html_icmphost, pSection)
#-------------------------------------------------------------------------------
def remove_section (pText, pSection):
# Search section into the text
if pText.find ('<'+ pSection +'>') >=0 \
and pText.find (''+ pSection +'>') >=0 :
# return text without the section
return pText[:pText.find ('<'+ pSection+'>')] + \
pText[pText.find (''+ pSection +'>') + len (pSection) +3:]
else :
# return all text
return pText
#-------------------------------------------------------------------------------
def remove_tag (pText, pTag):
# return text without the tag
return pText.replace ('<'+ pTag +'>','').replace (''+ pTag +'>','')
#-------------------------------------------------------------------------------
def write_file(pPath, pText):
# Write the text depending using the correct python version
if sys.version_info < (3, 0):
file = io.open (pPath , mode='w', encoding='utf-8')
file.write ( pText.decode('unicode_escape') )
else:
file = open (pPath, 'w', encoding='utf-8')
file.write (pText)
file.close()
#-------------------------------------------------------------------------------
def append_line_to_file(pPath, pText):
# append the line depending using the correct python version
if sys.version_info < (3, 0):
file = io.open (pPath , mode='a', encoding='utf-8')
file.write ( pText.decode('unicode_escape') )
else:
file = open (pPath, 'a', encoding='utf-8')
file.write (pText)
file.close()
#-------------------------------------------------------------------------------
def send_email (pText, pHTML):
# Compose email
msg = MIMEMultipart('alternative')
msg['Subject'] = 'Pi.Alert Report'
msg['From'] = REPORT_FROM
msg['To'] = REPORT_TO
msg.attach (MIMEText (pText, 'plain'))
msg.attach (MIMEText (pHTML, 'html'))
# Send mail
smtp_connection = smtplib.SMTP (SMTP_SERVER, SMTP_PORT)
smtp_connection.ehlo()
if not SafeParseGlobalBool("SMTP_SKIP_TLS"):
smtp_connection.starttls()
smtp_connection.ehlo()
if not SafeParseGlobalBool("SMTP_SKIP_LOGIN"):
escaped_password = repr(SMTP_PASS)[1:-1]
smtp_connection.login (SMTP_USER, escaped_password)
smtp_connection.sendmail (REPORT_FROM, REPORT_TO, msg.as_string())
smtp_connection.quit()
#-------------------------------------------------------------------------------
def SafeParseGlobalBool(boolVariable):
if boolVariable in globals():
return eval(boolVariable)
return False
#===============================================================================
# DB
#===============================================================================
def openDB():
global sql_connection
global sql
# Check if DB is open
if sql_connection != None :
return
# Log
print_log ('Opening DB...')
# Open DB and Cursor
sql_connection = sqlite3.connect (DB_PATH, isolation_level=None)
sql_connection.execute('pragma journal_mode=wal') #
sql_connection.text_factory = str
sql_connection.row_factory = sqlite3.Row
sql = sql_connection.cursor()
#-------------------------------------------------------------------------------
def closeDB():
global sql_connection
global sql
# Check if DB is open
if sql_connection == None :
return
# Log
print_log ('Closing DB...')
# Close DB
sql_connection.commit()
sql_connection.close()
sql_connection = None
#===============================================================================
# UTIL
#===============================================================================
def print_log (pText):
global log_timestamp
# Check LOG actived
if not PRINT_LOG :
return
# Current Time
log_timestamp2 = datetime.datetime.now()
# Print line + time + elapsed time + text
print('--------------------> ',
log_timestamp2, ' ',
log_timestamp2 - log_timestamp, ' ',
pText)
# Save current time to calculate elapsed time until next log
log_timestamp = log_timestamp2
#===============================================================================
# BEGIN
#===============================================================================
if __name__ == '__main__':
sys.exit(main())