""" CVE-2026-26198 — Vulnerable Pattern This module recreates the vulnerable code pattern from Ormar ORM's aggregate query handling. The actual vulnerability was in ormar's SelectAction class, which passed user-supplied column names directly to sqlalchemy.text(). We simulate this pattern using raw SQLAlchemy to demonstrate the issue without requiring a specific vulnerable version of Ormar to be installed. DO NOT use this pattern in production code. """ import sqlite3 from dataclasses import dataclass # --------------------------------------------------------------------------- # Simulated ORM layer (mirrors Ormar's vulnerable internals) # --------------------------------------------------------------------------- class VulnerableQuerySet: """ Simulates Ormar's QuerySet with the vulnerable min()/max() methods. The critical flaw: min() and max() accept any string as a column name and embed it directly into the SQL query without validation. This is exactly what Ormar's SelectAction.get_text_clause() did: def get_text_clause(self): return sqlalchemy.text(f"{alias}{self.field_name}") # ^^^^^^^^^^^^^^^^ # Unsanitized user input! """ def __init__(self, db_path: str, table_name: str, model_fields: list[str]): self.db_path = db_path self.table_name = table_name self.model_fields = model_fields # Known valid fields def _execute(self, sql: str) -> any: """Execute a query and return the scalar result.""" conn = sqlite3.connect(self.db_path) try: cursor = conn.execute(sql) row = cursor.fetchone() return row[0] if row else None finally: conn.close() # --- VULNERABLE: No validation on column parameter --- def max(self, column: str) -> any: """ VULNERABLE: Passes column directly into SQL without any check. In Ormar, this went through SelectAction.get_text_clause() which called sqlalchemy.text(field_name) — a raw SQL text expression. """ # This is the dangerous line — column is attacker-controlled sql = f"SELECT max({column}) FROM {self.table_name}" return self._execute(sql) def min(self, column: str) -> any: """VULNERABLE: Same issue as max().""" sql = f"SELECT min({column}) FROM {self.table_name}" return self._execute(sql) # --- SAFE: sum() and avg() had validation (partial fix in Ormar) --- def sum(self, column: str) -> any: """ SAFE: Has is_numeric type check (like Ormar's sum/avg). Ormar's sum() and avg() validated that the field existed and was numeric. This is the inconsistency that made the bug so sneaky — developers assumed all aggregate methods had the same protection. """ if column not in self.model_fields: raise ValueError( f"Column '{column}' is not a valid field. " f"Available fields: {self.model_fields}" ) sql = f"SELECT sum({column}) FROM {self.table_name}" return self._execute(sql) # --------------------------------------------------------------------------- # Demo app setup # --------------------------------------------------------------------------- def create_demo_database(db_path: str) -> None: """ Set up a demo database simulating an e-commerce application. Two tables: - products: Public data (name, price) — what the API exposes - users: Sensitive data (email, password_hash) — should be hidden """ conn = sqlite3.connect(db_path) conn.executescript(""" CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, price REAL NOT NULL, stock INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY, username TEXT NOT NULL, email TEXT NOT NULL, password_hash TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'user' ); -- Sample product data INSERT OR IGNORE INTO products (id, name, price, stock) VALUES (1, 'Wireless Mouse', 29.99, 150), (2, 'Mechanical Keyboard', 89.99, 75), (3, 'USB-C Hub', 49.99, 200), (4, '4K Monitor', 399.99, 30), (5, 'Webcam HD', 69.99, 100); -- Sensitive user data (should NEVER be accessible via product queries) INSERT OR IGNORE INTO users (id, username, email, password_hash, role) VALUES (1, 'admin', 'admin@company.com', 'pbkdf2:sha256:fakehash_admin_secret', 'admin'), (2, 'alice', 'alice@example.com', 'pbkdf2:sha256:fakehash_alice_pw123', 'user'), (3, 'bob', 'bob@example.com', 'pbkdf2:sha256:fakehash_bob_secure', 'user'); """) conn.commit() conn.close() @dataclass class ProductQuerySet: """A product-focused query interface — should only access the products table.""" db_path: str def get_vulnerable_queryset(self) -> VulnerableQuerySet: return VulnerableQuerySet( db_path=self.db_path, table_name="products", model_fields=["id", "name", "price", "stock"], )