#!/usr/bin/env python3 """ CVE-2025-20393 Scanner Cisco Secure Email Gateway & Email and Web Manager RCE Vulnerability Scanner Author: thesystemowner Version: 1.0 Date: December 2025 Description: This scanner detects Cisco SEG/SEWM appliances vulnerable to CVE-2025-20393, a critical unauthenticated RCE vulnerability in Cisco AsyncOS affecting devices with Spam Quarantine feature exposed to the internet. CVSS Score: 10.0 (Critical) CWE-20: Improper Input Validation Active exploitation in the wild by UAT-9686 (Chinese APT) """ import argparse import requests import socket import sys import json import re from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse import warnings warnings.filterwarnings('ignore') class Colors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' class CVE202520393Scanner: def __init__(self, timeout=10, user_agent=None): self.timeout = timeout self.user_agent = user_agent or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" self.session = requests.Session() self.session.verify = False self.session.headers.update({'User-Agent': self.user_agent}) # Known Spam Quarantine ports (default and common) self.spam_quarantine_ports = [6025, 82, 443, 8443] # IOCs from Cisco Talos UAT-9686 analysis self.iocs = { 'aquashell_signatures': [ b'aquashell', b'AquaShell', b'python.*backdoor' ], 'aquatunnel_signatures': [ b'aquatunnel', b'reverse.*ssh', b'chisel' ], 'aquapurge_signatures': [ b'aquapurge', b'log.*clear' ] } def print_banner(self): banner = f""" {Colors.OKCYAN}╔═══════════════════════════════════════════════════════════════╗ ║ CVE-2025-20393 Vulnerability Scanner ║ ║ Cisco Secure Email Gateway & Email and Web Manager ║ ║ Unauthenticated RCE Scanner ║ ║ ║ ║ CVSS: 10.0 (Critical) | CWE-20: Improper Input Validation ║ ║ Active Exploitation: UAT-9686 (Chinese APT) ║ ╚═══════════════════════════════════════════════════════════════╝{Colors.ENDC} """ print(banner) def banner_grab(self, target, port): """Grab server banner to identify Cisco AsyncOS""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(self.timeout) sock.connect((target, port)) sock.send(b"HEAD / HTTP/1.1\r\nHost: " + target.encode() + b"\r\n\r\n") banner = sock.recv(1024).decode('utf-8', errors='ignore') return banner except Exception as e: return None def check_cisco_product(self, target, port): """Check if target is a Cisco SEG/SEWM appliance""" try: url = f"https://{target}:{port}" response = self.session.get(url, timeout=self.timeout, allow_redirects=True) # Check for Cisco AsyncOS indicators indicators = [ 'cisco', 'ironport', 'asyncos', 'secure email gateway', 'secure email and web manager', 'spam quarantine' ] content = response.text.lower() headers = str(response.headers).lower() matches = [] for indicator in indicators: if indicator in content or indicator in headers: matches.append(indicator) if matches: return { 'is_cisco': True, 'indicators': matches, 'status_code': response.status_code, 'server_header': response.headers.get('Server', 'Unknown') } except Exception as e: pass return {'is_cisco': False} def check_spam_quarantine(self, target, port): """Check if Spam Quarantine interface is accessible""" paths = [ '/quarantine', '/spamquarantine', '/spam', '/sma-login', '/login' ] for path in paths: try: url = f"https://{target}:{port}{path}" response = self.session.get(url, timeout=self.timeout, allow_redirects=False) # Check for spam quarantine indicators if response.status_code in [200, 301, 302]: content = response.text.lower() if any(keyword in content for keyword in ['quarantine', 'spam', 'cisco', 'ironport']): return { 'exposed': True, 'path': path, 'status_code': response.status_code, 'url': url } except Exception as e: continue return {'exposed': False} def check_version_disclosure(self, target, port): """Attempt to identify AsyncOS version""" try: url = f"https://{target}:{port}" response = self.session.get(url, timeout=self.timeout) # Common version disclosure patterns version_patterns = [ r'AsyncOS\s+(\d+\.\d+\.\d+)', r'Version:\s*(\d+\.\d+\.\d+)', r'asyncos[_-](\d+\.\d+\.\d+)' ] for pattern in version_patterns: match = re.search(pattern, response.text, re.IGNORECASE) if match: return { 'version_found': True, 'version': match.group(1) } except Exception as e: pass return {'version_found': False} def scan_target(self, target): """Comprehensive scan of a single target""" print(f"\n{Colors.OKBLUE}[*] Scanning: {target}{Colors.ENDC}") results = { 'target': target, 'timestamp': datetime.now().isoformat(), 'vulnerable': False, 'risk_level': 'Unknown', 'findings': [] } # Check common ports for port in self.spam_quarantine_ports: print(f" {Colors.OKCYAN}[+] Checking port {port}...{Colors.ENDC}") # Banner grabbing banner = self.banner_grab(target, port) if banner and 'cisco' in banner.lower(): results['findings'].append(f"Cisco banner detected on port {port}") # Check if it's a Cisco product cisco_check = self.check_cisco_product(target, port) if cisco_check['is_cisco']: print(f" {Colors.OKGREEN}[✓] Cisco AsyncOS product detected!{Colors.ENDC}") print(f" {Colors.WARNING} Indicators: {', '.join(cisco_check['indicators'])}{Colors.ENDC}") results['findings'].append(f"Cisco product identified on port {port}: {cisco_check['indicators']}") # Check for exposed Spam Quarantine spam_check = self.check_spam_quarantine(target, port) if spam_check['exposed']: print(f" {Colors.FAIL}[!] SPAM QUARANTINE EXPOSED: {spam_check['url']}{Colors.ENDC}") results['vulnerable'] = True results['risk_level'] = 'CRITICAL' results['findings'].append(f"Spam Quarantine exposed at {spam_check['url']}") # Check version version_check = self.check_version_disclosure(target, port) if version_check['version_found']: print(f" {Colors.WARNING}[!] Version detected: {version_check['version']}{Colors.ENDC}") results['findings'].append(f"AsyncOS version: {version_check['version']}") return results def generate_report(self, results, output_file=None): """Generate vulnerability report""" report = { 'scan_info': { 'cve': 'CVE-2025-20393', 'severity': 'CRITICAL', 'cvss_score': 10.0, 'description': 'Cisco Secure Email Gateway & Email and Web Manager Unauthenticated RCE', 'threat_actor': 'UAT-9686 (Chinese APT)', 'exploitation_status': 'Active in the wild', 'scan_date': datetime.now().isoformat() }, 'results': results } # Print summary vulnerable_targets = [r for r in results if r['vulnerable']] print(f"\n{Colors.BOLD}{'='*60}{Colors.ENDC}") print(f"{Colors.HEADER}[SCAN SUMMARY]{Colors.ENDC}") print(f"{Colors.BOLD}{'='*60}{Colors.ENDC}") print(f"Total targets scanned: {len(results)}") print(f"{Colors.FAIL}Vulnerable targets: {len(vulnerable_targets)}{Colors.ENDC}") print(f"{Colors.OKGREEN}Secure targets: {len(results) - len(vulnerable_targets)}{Colors.ENDC}") if vulnerable_targets: print(f"\n{Colors.FAIL}{Colors.BOLD}[!] CRITICAL VULNERABILITIES FOUND:{Colors.ENDC}") for target in vulnerable_targets: print(f" • {target['target']} - Risk Level: {target['risk_level']}") for finding in target['findings']: print(f" - {finding}") # Save to file if output_file: with open(output_file, 'w') as f: json.dump(report, f, indent=4) print(f"\n{Colors.OKGREEN}[+] Report saved to: {output_file}{Colors.ENDC}") return report def scan_targets(self, targets, threads=10): """Scan multiple targets concurrently""" results = [] with ThreadPoolExecutor(max_workers=threads) as executor: futures = {executor.submit(self.scan_target, target): target for target in targets} for future in as_completed(futures): try: result = future.result() results.append(result) except Exception as e: target = futures[future] print(f"{Colors.FAIL}[!] Error scanning {target}: {str(e)}{Colors.ENDC}") return results def parse_targets(target_input): """Parse target input (single target, file, or CIDR)""" targets = [] if '/' in target_input: # CIDR notation try: import ipaddress network = ipaddress.ip_network(target_input, strict=False) targets = [str(ip) for ip in network.hosts()] except Exception as e: print(f"{Colors.FAIL}[!] Invalid CIDR notation: {str(e)}{Colors.ENDC}") sys.exit(1) elif target_input.startswith('http'): parsed = urlparse(target_input) targets = [parsed.netloc] else: try: with open(target_input, 'r') as f: targets = [line.strip() for line in f if line.strip()] except FileNotFoundError: targets = [target_input] return targets def main(): parser = argparse.ArgumentParser( description='CVE-2025-20393 Scanner - Cisco AsyncOS Spam Quarantine RCE', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s -t 192.168.1.100 %(prog)s -t targets.txt -o report.json %(prog)s -t 10.0.0.0/24 --threads 20 %(prog)s -t mail.company.com --timeout 15 IOCs for CVE-2025-20393: - AquaShell (Python backdoor) - AquaTunnel (Reverse SSH tunnel) - Chisel (Tunneling tool) - AquaPurge (Log clearing utility) """ ) parser.add_argument('-t', '--target', required=True, help='Target IP, hostname, file, or CIDR range') parser.add_argument('-o', '--output', help='Output JSON report file') parser.add_argument('--timeout', type=int, default=10, help='Connection timeout in seconds (default: 10)') parser.add_argument('--threads', type=int, default=10, help='Number of concurrent threads (default: 10)') parser.add_argument('--user-agent', help='Custom User-Agent string') args = parser.parse_args() scanner = CVE202520393Scanner( timeout=args.timeout, user_agent=args.user_agent ) scanner.print_banner() targets = parse_targets(args.target) print(f"\n{Colors.OKBLUE}[*] Loaded {len(targets)} target(s){Colors.ENDC}") print(f"\n{Colors.WARNING}[!] Disclaimer: This tool is for authorized security testing only.{Colors.ENDC}") print(f"{Colors.WARNING}[!] Unauthorized access to computer systems is illegal.{Colors.ENDC}\n") try: input(f"{Colors.OKGREEN}Press ENTER to start scanning...{Colors.ENDC}") except KeyboardInterrupt: print(f"\n{Colors.FAIL}[!] Scan cancelled by user{Colors.ENDC}") sys.exit(0) results = scanner.scan_targets(targets, threads=args.threads) scanner.generate_report(results, args.output) print(f"\n{Colors.OKCYAN}[*] Scan completed!{Colors.ENDC}") if __name__ == '__main__': try: main() except KeyboardInterrupt: print(f"\n{Colors.FAIL}[!] Interrupted by user{Colors.ENDC}") sys.exit(0)