#!/usr/bin/env python3 """ CVE-2025-53770 SharePoint Vulnerability Scanner. A comprehensive security scanner designed to identify SharePoint instances vulnerable to CVE-2025-53770, which involves a deserialization vulnerability in SharePoint's ExcelDataSet component that allows remote code execution and machine key extraction. This scanner incorporates real-world attack patterns observed in active exploitation campaigns and provides detailed confidence scoring based on machine key extraction, secondary payload deployment, and SharePoint component processing indicators. """ import json import csv import ssl import socket import re import time import random import logging import hashlib import asyncio from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse from pathlib import Path from typing import Dict, List, Optional, Union, Any, Tuple from dataclasses import dataclass, asdict from contextlib import contextmanager, suppress import threading import warnings import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import urllib3 @dataclass class ScanResult: """Data class for scan results""" host: str url: str scan_time: str vulnerable: bool = False status_code: Optional[int] = None response_size: int = 0 error: Optional[str] = None response_time: Optional[float] = None request_size: int = 0 detection_confidence: str = "none" confidence_score: int = 0 vulnerability_indicators: List[str] = None ssl_info: Dict[str, Any] = None sharepoint_info: Dict[str, Any] = None security_headers: Dict[str, Any] = None sharepoint_version_hint: str = "Unknown" endpoint_tested: str = "" cached_result: bool = False scan_metrics: Dict[str, Any] = None def __post_init__(self): if self.vulnerability_indicators is None: self.vulnerability_indicators = [] if self.ssl_info is None: self.ssl_info = {} if self.sharepoint_info is None: self.sharepoint_info = {} if self.security_headers is None: self.security_headers = {} if self.scan_metrics is None: self.scan_metrics = {} class ConfigManager: """Configuration management for the scanner""" def __init__(self, config_path: str = "config.json"): self.config_path = config_path self.config = self._load_config() self._compile_patterns() def _load_config(self) -> Dict[str, Any]: """Load configuration from JSON file""" try: with open(self.config_path, "r", encoding="utf-8") as f: return json.load(f) except FileNotFoundError: logging.warning(f"Config file {self.config_path} not found, using defaults") return self._get_default_config() except json.JSONDecodeError as e: logging.error(f"Invalid JSON in config file: {e}") return self._get_default_config() def _get_default_config(self) -> Dict[str, Any]: """Default configuration if file is missing""" return { "detection_rules": { "critical_patterns": [ { "name": "machine_key_extraction", "pattern": r"[A-F0-9]{128,256}\|[A-Z0-9]+\|[A-F0-9]{48,96}\|[A-Z0-9]+\|Framework[0-9A-Z]+", "score": 95, "description": "Full machine key extraction response detected", "case_insensitive": True } ], "high_patterns": [], "medium_patterns": [ { "name": "sharepoint_components", "patterns": ["Scorecard", "ExcelDataSet"], "score": 25, "description": "SharePoint vulnerable components" } ], "low_patterns": [] }, "confidence_thresholds": { "critical": 85, "high": 75, "medium": 60, "low": 50, }, "scan_settings": { "default_timeout": 10, "default_threads": 10, "ssl_verification": True, }, "endpoints": [ "/_layouts/15/ToolPane.aspx?DisplayMode=Edit&a=/ToolPane.aspx" ], } def _compile_patterns(self) -> None: """Pre-compile regex patterns for performance""" self.compiled_patterns = {} detection_rules = self.config.get("detection_rules", {}) for category, patterns in detection_rules.items(): self.compiled_patterns[category] = [] for pattern_config in patterns: if "pattern" in pattern_config: flags = ( re.IGNORECASE if pattern_config.get("case_insensitive", False) else 0 ) compiled = re.compile(pattern_config["pattern"], flags) self.compiled_patterns[category].append( {**pattern_config, "compiled_pattern": compiled} ) else: self.compiled_patterns[category].append(pattern_config) def get_detection_rules(self) -> Dict[str, List[Dict]]: """Get compiled detection rules""" return self.compiled_patterns def get_confidence_thresholds(self) -> Dict[str, int]: """Get confidence thresholds""" return self.config.get("confidence_thresholds", {}) def get_scan_settings(self) -> Dict[str, Any]: """Get scan settings""" return self.config.get("scan_settings", {}) def get_endpoints(self) -> List[str]: """Get endpoints to test""" return self.config.get("endpoints", []) def get_payload_config(self) -> Dict[str, str]: """Get payload configuration""" return self.config.get("payload_config", {}) def get_request_headers(self) -> Dict[str, str]: """Get request headers configuration""" return self.config.get("request_headers", {}) def get_rate_limiting(self) -> Dict[str, Any]: """Get rate limiting configuration""" return self.config.get("rate_limiting", {}) def get_caching(self) -> Dict[str, Any]: """Get caching configuration""" return self.config.get("caching", {}) def get_metrics(self) -> Dict[str, Any]: """Get metrics configuration""" return self.config.get("metrics", {}) class RateLimiter: """Rate limiting implementation""" def __init__(self, requests_per_second: float = 10, burst_size: int = 20): self.requests_per_second = requests_per_second self.burst_size = burst_size self.tokens = burst_size self.last_update = time.time() self.lock = threading.Lock() def acquire(self) -> bool: """Acquire a token for rate limiting""" with self.lock: now = time.time() elapsed = now - self.last_update self.tokens = min(self.burst_size, self.tokens + elapsed * self.requests_per_second) self.last_update = now if self.tokens >= 1: self.tokens -= 1 return True return False def wait_for_token(self): """Wait until a token is available""" while not self.acquire(): time.sleep(1.0 / self.requests_per_second) class ScanCache: """Caching implementation for scan results""" def __init__(self, cache_file: str = "scan_cache.json", duration_seconds: int = 3600): self.cache_file = cache_file self.duration = timedelta(seconds=duration_seconds) self.cache = {} self.lock = threading.Lock() self._load_cache() def _load_cache(self): """Load cache from file""" try: if Path(self.cache_file).exists(): with open(self.cache_file, 'r') as f: data = json.load(f) for key, value in data.items(): # Convert ISO timestamp back to datetime value['timestamp'] = datetime.fromisoformat(value['timestamp']) # Keep result data as dict for lazy loading self.cache[key] = value except Exception as e: logging.warning(f"Failed to load cache: {e}") def _save_cache(self): """Save cache to file""" try: # Convert datetime to ISO string for JSON serialization cache_data = {} for key, value in self.cache.items(): cache_data[key] = { **value, 'timestamp': value['timestamp'].isoformat() } with open(self.cache_file, 'w') as f: json.dump(cache_data, f, indent=2) except Exception as e: logging.warning(f"Failed to save cache: {e}") def _get_cache_key(self, host: str, endpoint: str) -> str: """Generate cache key for host/endpoint combination""" return hashlib.md5(f"{host}:{endpoint}".encode()).hexdigest() def get(self, host: str, endpoint: str) -> Optional[ScanResult]: """Get cached result if valid""" with self.lock: key = self._get_cache_key(host, endpoint) if key in self.cache: entry = self.cache[key] if datetime.now() - entry['timestamp'] < self.duration: # Reconstruct ScanResult from cached dict result_data = entry['result'] if isinstance(result_data, dict): result = ScanResult(**result_data) else: result = result_data # Handle old format result.cached_result = True return result else: # Expired entry del self.cache[key] return None def put(self, host: str, endpoint: str, result: ScanResult): """Cache scan result""" with self.lock: key = self._get_cache_key(host, endpoint) self.cache[key] = { 'timestamp': datetime.now(), 'result': asdict(result) # Convert dataclass to dict for JSON serialization } self._save_cache() class MetricsCollector: """Performance and accuracy metrics collection""" def __init__(self): self.metrics = { 'total_scans': 0, 'successful_scans': 0, 'vulnerable_found': 0, 'average_response_time': 0.0, 'cache_hits': 0, 'rate_limited_requests': 0, 'ssl_errors': 0, 'scan_start_time': None, 'scan_end_time': None } self.response_times = [] self.lock = threading.Lock() def start_scan(self): """Mark scan start time""" self.metrics['scan_start_time'] = datetime.now() def end_scan(self): """Mark scan end time""" self.metrics['scan_end_time'] = datetime.now() def record_scan(self, result: ScanResult): """Record scan result metrics""" with self.lock: self.metrics['total_scans'] += 1 if not result.error: self.metrics['successful_scans'] += 1 if result.response_time: self.response_times.append(result.response_time) self.metrics['average_response_time'] = sum(self.response_times) / len(self.response_times) if result.vulnerable: self.metrics['vulnerable_found'] += 1 if result.cached_result: self.metrics['cache_hits'] += 1 if 'ssl' in str(result.error).lower(): self.metrics['ssl_errors'] += 1 def record_rate_limit(self): """Record rate limiting event""" with self.lock: self.metrics['rate_limited_requests'] += 1 def get_metrics(self) -> Dict[str, Any]: """Get current metrics""" with self.lock: metrics = self.metrics.copy() if metrics['scan_start_time'] and metrics['scan_end_time']: duration = metrics['scan_end_time'] - metrics['scan_start_time'] metrics['total_scan_duration'] = duration.total_seconds() metrics['scans_per_second'] = metrics['total_scans'] / duration.total_seconds() if duration.total_seconds() > 0 else 0 return metrics class SharePointScanner: """Main scanner class with enhanced capabilities""" def __init__(self, config_path: str = "config.json"): self.config_manager = ConfigManager(config_path) self.logger = logging.getLogger(__name__) # Initialize enhanced components rate_config = self.config_manager.get_rate_limiting() if rate_config.get('enabled', True): self.rate_limiter = RateLimiter( requests_per_second=rate_config.get('requests_per_second', 10), burst_size=rate_config.get('burst_size', 20) ) else: self.rate_limiter = None cache_config = self.config_manager.get_caching() if cache_config.get('enabled', True): self.cache = ScanCache( cache_file=cache_config.get('cache_file', 'scan_cache.json'), duration_seconds=cache_config.get('cache_duration_seconds', 3600) ) else: self.cache = None metrics_config = self.config_manager.get_metrics() if metrics_config.get('enabled', True): self.metrics = MetricsCollector() else: self.metrics = None # Context-specific SSL warnings handling self._ssl_context = None @staticmethod def get_ssl_info(host: str, port: int = 443, timeout: int = 10) -> Dict[str, Any]: """Get SSL certificate information""" try: context = ssl.create_default_context() with socket.create_connection((host, port), timeout=timeout) as sock: with context.wrap_socket(sock, server_hostname=host) as ssock: cert = ssock.getpeercert() return { "subject": dict(x[0] for x in cert.get("subject", [])), "issuer": dict(x[0] for x in cert.get("issuer", [])), "version": cert.get("version"), "serial_number": cert.get("serialNumber"), "not_before": cert.get("notBefore"), "not_after": cert.get("notAfter"), "cipher": ssock.cipher(), "compression": ssock.compression(), } except Exception as e: return {"error": str(e)} @staticmethod def detect_sharepoint_version( response_headers: Dict[str, str], response_text: str ) -> Dict[str, Any]: """Detect SharePoint version from headers and response""" version_info = { "sharepoint_version": "Unknown", "server_info": response_headers.get("Server", "Unknown"), "asp_net_version": response_headers.get("X-AspNet-Version", "Unknown"), "sharepoint_headers": {}, } # Check for SharePoint-specific headers sp_headers = [ "MicrosoftSharePointTeamServices", "X-SharePointError", "X-MS-SPError", "SPIisLatency", "SPRequestGuid", "X-MS-InvokeApp", ] for header in sp_headers: if header in response_headers: version_info["sharepoint_headers"][header] = response_headers[header] # Try to detect version from response text patterns version_patterns = [ (r"SharePoint.*?(\d{4})", "SharePoint {version}"), (r"Microsoft.*?SharePoint.*?(\d+\.\d+)", "SharePoint {version}"), (r"_layouts/(\d+)/", "SharePoint {version} (Layout Version)"), ] for pattern, format_str in version_patterns: match = re.search(pattern, response_text, re.IGNORECASE) if match: version_info["sharepoint_version"] = format_str.format( version=match.group(1) ) break return version_info def create_session_with_retries(self) -> requests.Session: """Create a requests session with retry strategy""" session = requests.Session() settings = self.config_manager.get_scan_settings() retry_strategy = Retry( total=settings.get("max_retries", 3), backoff_factor=settings.get("backoff_factor", 1), status_forcelist=[429, 500, 502, 503, 504], ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) return session def analyze_response( self, response: requests.Response, response_time: float, url: str ) -> Tuple[List[str], int, str]: """Analyze response for vulnerability indicators""" vulnerability_indicators = [] confidence_score = 0 detection_rules = self.config_manager.get_detection_rules() # Critical patterns (machine key extraction) for pattern_config in detection_rules.get("critical_patterns", []): if "compiled_pattern" in pattern_config: if pattern_config["compiled_pattern"].search(response.text): vulnerability_indicators.append(pattern_config["description"]) confidence_score += pattern_config["score"] elif "patterns" in pattern_config: for pattern in pattern_config["patterns"]: if pattern in response.text: vulnerability_indicators.append( f"{pattern_config['description']}: {pattern}" ) confidence_score += pattern_config["score"] break # Response characteristics analysis if response.status_code == 200: if len(response.text) > 200: vulnerability_indicators.append( f"HTTP 200 with substantial content ({len(response.text)} bytes)" ) confidence_score += 15 else: vulnerability_indicators.append("HTTP 200 response to crafted payload") confidence_score += 5 # Version-specific indicators version_hint = "Unknown" if "WEBSER~1\\15\\TEMPLATE" in response.text or "_layouts/15/" in url: vulnerability_indicators.append( "SharePoint 2013/2016 layout structure detected" ) version_hint = "2013-2016" confidence_score += 2 return vulnerability_indicators, confidence_score, version_hint def determine_confidence_level(self, confidence_score: int) -> Tuple[bool, str]: """Determine vulnerability status and confidence level""" thresholds = self.config_manager.get_confidence_thresholds() if confidence_score >= thresholds.get("critical", 85): return True, "critical" elif confidence_score >= thresholds.get("high", 75): return True, "high" elif confidence_score >= thresholds.get("medium", 60): return True, "medium" elif confidence_score >= thresholds.get("low", 50): return True, "low" else: return False, "none" def scan_host( self, host: str, timeout: int = None, additional_mode: bool = False, delay_range: Optional[Tuple[float, float]] = None, ssl_check: bool = False, ) -> ScanResult: """Scan a single host for CVE-2025-53770 vulnerability""" if timeout is None: timeout = self.config_manager.get_scan_settings().get("default_timeout", 10) endpoints = self.config_manager.get_endpoints() best_result = None highest_confidence = -1 # Test multiple endpoints for endpoint in endpoints: try: result = self._scan_endpoint( host, endpoint, timeout, additional_mode, delay_range, ssl_check ) if result.confidence_score > highest_confidence: highest_confidence = result.confidence_score best_result = result except Exception as e: self.logger.error( f"Error scanning {host} with endpoint {endpoint}: {e}" ) if best_result is None: best_result = ScanResult( host=host, url=f"https://{host}{endpoint}", scan_time=datetime.now().isoformat(), error=str(e), endpoint_tested=endpoint, ) return best_result or ScanResult( host=host, url=f"https://{host}", scan_time=datetime.now().isoformat(), error="No valid endpoints to test", ) def _scan_endpoint( self, host: str, endpoint: str, timeout: int, additional_mode: bool, delay_range: Optional[Tuple[float, float]], ssl_check: bool, ) -> ScanResult: """Scan a specific endpoint""" url = f"https://{host}{endpoint}" # Apply delay if specified if delay_range: delay = random.uniform(delay_range[0], delay_range[1]) time.sleep(delay) # Apply rate limiting if enabled if self.rate_limiter: if not self.rate_limiter.acquire(): if self.metrics: self.metrics.record_rate_limit() self.rate_limiter.wait_for_token() # Check cache first if self.cache: cached_result = self.cache.get(host, endpoint) if cached_result: self.logger.info(f"Using cached result for {host}") return cached_result # Prepare request with config-based payload and headers payload_config = self.config_manager.get_payload_config() if not payload_config: # Fallback to minimal payload if config is missing payload_config = { "MSOTlPn_Uri": "https://{host}/_controltemplates/15/AclEditor.ascx", "MSOTlPn_DWP": "" } data = payload_config.copy() data["MSOTlPn_Uri"] = data["MSOTlPn_Uri"].format(host=host) headers_config = self.config_manager.get_request_headers() headers = headers_config.copy() if headers_config else { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0", "Content-Type": "application/x-www-form-urlencoded" } if additional_mode: user_agents = self.config_manager.get_scan_settings().get( "user_agents", [headers.get("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")] ) headers["User-Agent"] = random.choice(user_agents) headers["Connection"] = "close" # Calculate request size request_size = len(f"{url}{json.dumps(data)}{json.dumps(headers)}") # Initialize result result = ScanResult( host=host, url=url, scan_time=datetime.now().isoformat(), request_size=request_size, endpoint_tested=endpoint, ) try: # Get SSL information if requested if ssl_check: parsed_url = urlparse(url) result.ssl_info = self.get_ssl_info(parsed_url.hostname) # Create session and make request session = self.create_session_with_retries() verify_ssl = self.config_manager.get_scan_settings().get("ssl_verification", True) # Context-specific SSL warning suppression with self._suppress_ssl_warnings(verify_ssl): start_time = time.time() response = session.post( url, headers=headers, data=data, verify=verify_ssl, timeout=timeout, allow_redirects=True, ) response_time = time.time() - start_time result.status_code = response.status_code result.response_size = len(response.text) result.response_time = response_time # Analyze security headers security_headers = { "strict_transport_security": response.headers.get( "Strict-Transport-Security" ), "content_security_policy": response.headers.get( "Content-Security-Policy" ), "x_frame_options": response.headers.get("X-Frame-Options"), "x_content_type_options": response.headers.get( "X-Content-Type-Options" ), "x_xss_protection": response.headers.get("X-XSS-Protection"), } result.security_headers = { k: v for k, v in security_headers.items() if v } # Detect SharePoint version and configuration result.sharepoint_info = self.detect_sharepoint_version( response.headers, response.text ) # Analyze for vulnerabilities indicators, confidence_score, version_hint = self.analyze_response( response, response_time, url ) result.vulnerability_indicators = indicators result.confidence_score = confidence_score result.sharepoint_version_hint = version_hint # Determine vulnerability status is_vulnerable, confidence_level = self.determine_confidence_level( confidence_score ) result.vulnerable = is_vulnerable result.detection_confidence = confidence_level # Enhanced result metrics if self.metrics and self.config_manager.get_metrics().get('track_performance', True): result.scan_metrics = { 'request_start_time': start_time, 'response_time': response_time, 'status_code': response.status_code, 'response_size': len(response.text), 'endpoint_used': endpoint } # Log results if is_vulnerable: self.logger.info( f"VULNERABLE: {host} - Confidence: {confidence_level} ({confidence_score}%) - " f"Status: {response.status_code}, Size: {len(response.text)}, Time: {response_time:.2f}s" ) else: self.logger.info( f"NOT VULNERABLE: {host} - Status: {response.status_code}, " f"Size: {len(response.text)}, Time: {response_time:.2f}s" ) except Exception as e: result.error = str(e)[:500] # Limit error message length self.logger.error(f"Error scanning {host}: {result.error}") # Cache the result if caching is enabled if self.cache and not result.error: self.cache.put(host, endpoint, result) # Record metrics if self.metrics: self.metrics.record_scan(result) return result def save_results(self, results: List[ScanResult], output_file: str) -> None: """Save results to file in specified format""" if output_file.endswith(".json"): with open(output_file, "w", encoding="utf-8") as f: json.dump( [asdict(result) for result in results], f, indent=2, ensure_ascii=False, ) elif output_file.endswith(".csv"): with open(output_file, "w", newline="", encoding="utf-8") as f: if results: fieldnames = list(asdict(results[0]).keys()) writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for result in results: # Convert complex fields to JSON strings for CSV row = asdict(result) for key, value in row.items(): if isinstance(value, (list, dict)): row[key] = json.dumps(value) writer.writerow(row) else: # Text format with open(output_file, "w", encoding="utf-8") as f: for result in results: if result.vulnerable: status = f"VULNERABLE ({result.detection_confidence.upper()} confidence)" else: status = "NOT VULNERABLE" f.write(f"{status}: {result.host}\n") if result.error: f.write(f" Error: {result.error}\n") elif result.status_code: f.write( f" Status: {result.status_code}, Size: {result.response_size}, " f"Time: {result.response_time:.2f}s\n" ) f.write(f" Request Size: {result.request_size} bytes\n") f.write(f" Endpoint: {result.endpoint_tested}\n") if result.vulnerability_indicators: f.write(" Vulnerability Indicators:\n") for indicator in result.vulnerability_indicators: f.write(f" - {indicator}\n") f.write(f" Scanned: {result.scan_time}\n") if result.cached_result: f.write(f" Source: Cached Result\n") f.write("\n") @contextmanager def _suppress_ssl_warnings(self, verify_ssl: bool): """Context manager for SSL warning suppression""" if not verify_ssl: with warnings.catch_warnings(): warnings.simplefilter("ignore", urllib3.exceptions.InsecureRequestWarning) yield else: yield def get_scan_metrics(self) -> Optional[Dict[str, Any]]: """Get scan metrics if available""" return self.metrics.get_metrics() if self.metrics else None def setup_logging(verbose: bool = False, log_file: Optional[str] = None) -> None: """Setup logging configuration""" log_level = logging.INFO if verbose else logging.WARNING log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" handlers = [] # Console handler console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter(log_format)) handlers.append(console_handler) # File handler if log_file: file_handler = logging.FileHandler(log_file, encoding="utf-8") file_handler.setFormatter(logging.Formatter(log_format)) handlers.append(file_handler) # Configure root logger logging.basicConfig( level=log_level, format=log_format, handlers=handlers, force=True ) def print_results_summary(results: List[ScanResult]) -> None: """Print comprehensive results summary""" total_hosts = len(results) vulnerable_results = [r for r in results if r.vulnerable] successful_scans = [r for r in results if not r.error] success_rate = (len(successful_scans) / total_hosts * 100) if total_hosts > 0 else 0 print(f"\n{'=' * 60}") print(f"SCAN COMPLETE") print(f"{'=' * 60}") print(f"Total hosts scanned: {total_hosts}") print(f"Vulnerable hosts: {len(vulnerable_results)}") print(f"Success rate: {success_rate:.1f}%") if vulnerable_results: print(f"\nVULNERABLE HOSTS (CVE-2025-53770):") # Group by confidence level confidence_groups = {} for result in vulnerable_results: confidence = result.detection_confidence if confidence not in confidence_groups: confidence_groups[confidence] = [] confidence_groups[confidence].append(result) # Display in priority order priority_order = ["critical", "high", "medium", "low"] for confidence in priority_order: if confidence in confidence_groups: results_group = confidence_groups[confidence] if confidence == "critical": print( f"\n CRITICAL - MACHINE KEY EXTRACTED ({len(results_group)} hosts):" ) for result in results_group: print( f" • {result.host} (Response time: {result.response_time:.2f}s, " f"Version: {result.sharepoint_version_hint})" ) print( f" WARNING: IMMEDIATE ACTION REQUIRED: Machine keys compromised" ) else: print( f"\n {confidence.upper()} CONFIDENCE ({len(results_group)} hosts):" ) for result in results_group: print( f" • {result.host} (Response time: {result.response_time:.2f}s, " f"Version: {result.sharepoint_version_hint})" ) def main(): """Main function""" import argparse parser = argparse.ArgumentParser( description="CVE-2025-53770 SharePoint Vulnerability Scanner", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 scanner.py -i hosts.txt python3 scanner.py -i hosts.txt -o results.json -l scan.log -v python3 scanner.py -i hosts.txt --ssl-check --additional -t 20 """, ) parser.add_argument("-i", "--input", required=True, help="Path to host list file") parser.add_argument( "-o", "--output", help="Output file for results (supports .json, .csv, or .txt)" ) parser.add_argument("-l", "--logfile", help="Log file path") parser.add_argument( "-t", "--threads", type=int, default=10, help="Number of concurrent threads" ) parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") parser.add_argument( "--additional", action="store_true", help="Enable additional mode (random User-Agents, connection close)", ) parser.add_argument( "--delay", type=float, nargs=2, metavar=("MIN", "MAX"), help="Random delay between requests (min max in seconds)", ) parser.add_argument( "--ssl-check", action="store_true", help="Perform SSL certificate analysis" ) parser.add_argument( "--timeout", type=int, default=None, help="Request timeout in seconds" ) parser.add_argument( "--config", default="config.json", help="Configuration file path" ) args = parser.parse_args() # Setup logging setup_logging(args.verbose, args.logfile) logger = logging.getLogger(__name__) # Initialize scanner try: scanner = SharePointScanner(args.config) except Exception as e: logger.error(f"Failed to initialize scanner: {e}") return 1 # Load hosts try: with open(args.input, "r", encoding="utf-8") as f: hosts = [ line.strip() for line in f if line.strip() and not line.strip().startswith("#") ] except FileNotFoundError: logger.error(f"Host file not found: {args.input}") return 1 except Exception as e: logger.error(f"Error reading host file: {e}") return 1 if not hosts: logger.error("No valid hosts found in input file") return 1 print(f"Starting scan of {len(hosts)} hosts with {args.threads} threads...") print(f"Target CVE: CVE-2025-53770 (SharePoint ExcelDataSet deserialization)") if args.logfile: print(f"Logging to: {args.logfile}") if args.output: print(f"Results will be saved to: {args.output}") # Start metrics collection if scanner.metrics: scanner.metrics.start_scan() results = [] completed = 0 delay_range = tuple(args.delay) if args.delay else None # Scan hosts with ThreadPoolExecutor(max_workers=args.threads) as executor: futures = { executor.submit( scanner.scan_host, host, timeout=args.timeout, additional_mode=args.additional, delay_range=delay_range, ssl_check=args.ssl_check, ): host for host in hosts } for future in as_completed(futures): try: result = future.result() results.append(result) completed += 1 # Print progress if result.vulnerable: confidence_badge = f"[{result.detection_confidence.upper()}]" print(f"[+] VULNERABLE {confidence_badge}: {result.host}") if args.verbose and result.vulnerability_indicators: for indicator in result.vulnerability_indicators[ :3 ]: # Limit output print(f" └─ {indicator}") else: if result.error: print(f"[!] ERROR: {result.host} - {result.error}") else: status_msg = f"[-] Not vulnerable: {result.host}" if ( args.verbose and result.sharepoint_info.get("sharepoint_version") != "Unknown" ): status_msg += f" (SharePoint {result.sharepoint_info['sharepoint_version']})" print(status_msg) # Progress indicator if completed % 25 == 0 or completed == len(hosts): print( f"Progress: {completed}/{len(hosts)} ({completed/len(hosts)*100:.1f}%)" ) except Exception as e: logger.error(f"Error processing scan result: {e}") completed += 1 # End metrics collection if scanner.metrics: scanner.metrics.end_scan() # Print summary print_results_summary(results) # Print performance metrics if available if scanner.metrics: metrics = scanner.get_scan_metrics() print(f"\n{'='*60}") print(f"PERFORMANCE METRICS") print(f"{'='*60}") print(f"Total scans: {metrics['total_scans']}") print(f"Successful scans: {metrics['successful_scans']}") print(f"Cache hits: {metrics['cache_hits']}") print(f"Average response time: {metrics['average_response_time']:.2f}s") if 'total_scan_duration' in metrics: print(f"Total scan duration: {metrics['total_scan_duration']:.2f}s") print(f"Scans per second: {metrics['scans_per_second']:.2f}") if metrics['rate_limited_requests'] > 0: print(f"Rate limited requests: {metrics['rate_limited_requests']}") if metrics['ssl_errors'] > 0: print(f"SSL errors: {metrics['ssl_errors']}") # Save results if args.output: try: scanner.save_results(results, args.output) print(f"\nDetailed results saved to: {args.output}") except Exception as e: logger.error(f"Error saving results: {e}") return 0 if __name__ == "__main__": exit(main())