#!/usr/bin/env python3 import json import os import time import urllib.error import urllib.parse import urllib.request from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer LANGFLOW_URL = os.environ.get("LANGFLOW_INTERNAL_URL", "http://127.0.0.1:7860").rstrip("/") PROXY_HOST = os.environ.get("CHALLENGE_PROXY_HOST", "0.0.0.0") PROXY_PORT = int(os.environ.get("CHALLENGE_PROXY_PORT", "9101")) USERNAME = os.environ.get("LANGFLOW_SUPERUSER", "administrator") PASSWORD = os.environ.get("LANGFLOW_SUPERUSER_PASSWORD", "securepassword") TARGET_DIR = os.environ.get( "CHALLENGE_TARGET_DIR", "/target/CVE-2026-42048", ) KB_ROOT = os.environ.get("LANGFLOW_KNOWLEDGE_BASES_DIR", "/tmp/langflow-lab/knowledge_bases") FLAG_PATH = "/flag.txt" _token = None _user_dir_prepared = False def http_request(method, url, *, headers=None, body=None, timeout=10): req = urllib.request.Request(url, data=body, headers=headers or {}, method=method) with urllib.request.urlopen(req, timeout=timeout) as resp: return resp.status, resp.headers, resp.read() def langflow_ready(): try: http_request("GET", f"{LANGFLOW_URL}/api/v1/version", timeout=3) return True except Exception: return False def get_token(): global _token, _user_dir_prepared if _token: if not _user_dir_prepared: prepare_user_dir(_token) return _token form = urllib.parse.urlencode( { "username": USERNAME, "password": PASSWORD, "grant_type": "password", "scope": "", } ).encode() status, _, raw = http_request( "POST", f"{LANGFLOW_URL}/api/v1/login", headers={"Content-Type": "application/x-www-form-urlencoded"}, body=form, ) if status != 200: raise RuntimeError(f"Langflow login failed with HTTP {status}") payload = json.loads(raw.decode()) _token = payload["access_token"] prepare_user_dir(_token) return _token def prepare_user_dir(token): global _user_dir_prepared if _user_dir_prepared: return status, _, raw = http_request( "GET", f"{LANGFLOW_URL}/api/v1/users/whoami", headers={"Authorization": f"Bearer {token}"}, ) if status != 200: raise RuntimeError(f"Could not resolve Langflow user info: HTTP {status}") user = json.loads(raw.decode()) username = user["username"] os.makedirs(os.path.join(KB_ROOT, username), exist_ok=True) _user_dir_prepared = True class ChallengeProxy(BaseHTTPRequestHandler): server_version = "CVE-2026-42048-Lab/1.0" def log_message(self, fmt, *args): print(f"[proxy] {self.address_string()} - {fmt % args}", flush=True) def send_json(self, status, payload): raw = json.dumps(payload).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) def send_text(self, status, text): raw = text.encode() self.send_response(status) self.send_header("Content-Type", "text/plain; charset=utf-8") self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) def do_GET(self): if self.path == "/health": if langflow_ready(): self.send_json(200, {"status": "ready"}) else: self.send_json(503, {"status": "starting"}) return if self.path == "/": self.send_text( 200, "CVE-2026-42048 lab\n" "Delete this kb_names path with the vulnerable API: /target/CVE-2026-42048\n" "Then request /flag.txt\n", ) return if self.path == "/status": self.send_json( 200, { "target": "/target/CVE-2026-42048", "target_exists": os.path.exists(TARGET_DIR), "vulnerability": "CVE-2026-42048 Langflow knowledge base bulk delete path traversal", }, ) return if self.path == "/flag.txt": if os.path.exists(TARGET_DIR): self.send_text(403, "Target still exists. Delete /target/CVE-2026-42048 first.\n") return with open(FLAG_PATH, "r", encoding="utf-8") as flag: self.send_text(200, flag.read()) return self.send_text(404, "Not found\n") def do_DELETE(self): if self.path not in {"/api/v1/knowledge_bases", "/api/v1/knowledge_bases/"}: self.send_text(404, "Not found\n") return length = int(self.headers.get("Content-Length", "0")) body = self.rfile.read(length) try: token = get_token() status, headers, raw = http_request( "DELETE", f"{LANGFLOW_URL}/api/v1/knowledge_bases", headers={ "Authorization": f"Bearer {token}", "Content-Type": self.headers.get("Content-Type", "application/json"), }, body=body, ) content_type = headers.get("Content-Type", "application/json") self.send_response(status) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) except urllib.error.HTTPError as exc: raw = exc.read() self.send_response(exc.code) self.send_header("Content-Type", exc.headers.get("Content-Type", "text/plain")) self.send_header("Content-Length", str(len(raw))) self.end_headers() self.wfile.write(raw) except Exception as exc: self.send_json(502, {"error": str(exc)}) def main(): deadline = time.time() + 180 while time.time() < deadline: if langflow_ready(): break time.sleep(2) server = ThreadingHTTPServer((PROXY_HOST, PROXY_PORT), ChallengeProxy) print(f"[proxy] listening on {PROXY_HOST}:{PROXY_PORT}", flush=True) server.serve_forever() if __name__ == "__main__": main()