""" CVE-2026-26198 — Test Suite Tests that verify: 1. The vulnerability exists in the unpatched code 2. The fix blocks all injection attempts 3. Normal queries still work correctly 4. Edge cases are handled (empty fields, special names, prefixed columns) Run with: python -m pytest test_vulnerability.py -v """ import os import sqlite3 import tempfile import pytest from vulnerable_app import VulnerableQuerySet, create_demo_database from patched_app import PatchedQuerySet, ColumnValidationError # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def db_path(): """Create a temporary database for each test.""" fd, path = tempfile.mkstemp(suffix=".db") os.close(fd) create_demo_database(path) yield path os.unlink(path) @pytest.fixture def vulnerable_qs(db_path): return VulnerableQuerySet( db_path=db_path, table_name="products", model_fields=["id", "name", "price", "stock"], ) @pytest.fixture def patched_qs(db_path): return PatchedQuerySet( db_path=db_path, table_name="products", model_fields=["id", "name", "price", "stock"], ) # --------------------------------------------------------------------------- # Part 1: Prove the vulnerability exists # --------------------------------------------------------------------------- class TestVulnerabilityExists: """Demonstrate that unpatched min()/max() allow SQL injection.""" def test_max_leaks_user_emails(self, vulnerable_qs): """Injecting a subquery into max() returns data from the users table.""" payload = "(SELECT group_concat(email) FROM users)" result = vulnerable_qs.max(payload) # The attack succeeds: we get user emails from a product query assert result is not None assert "admin@company.com" in result assert "alice@example.com" in result def test_min_leaks_password_hash(self, vulnerable_qs): """Injecting into min() extracts sensitive credential data.""" payload = "(SELECT password_hash FROM users WHERE role='admin')" result = vulnerable_qs.min(payload) assert result is not None assert "pbkdf2" in result # The password hash was extracted def test_max_leaks_user_roles(self, vulnerable_qs): """Full user roster with privilege levels can be extracted.""" payload = "(SELECT group_concat(username || ':' || role) FROM users)" result = vulnerable_qs.max(payload) assert "admin:admin" in result assert "alice:user" in result def test_sum_is_already_protected(self, vulnerable_qs): """sum() rejects unknown columns — showing the inconsistency.""" with pytest.raises(ValueError, match="not a valid field"): vulnerable_qs.sum("(SELECT 1 FROM users)") # --------------------------------------------------------------------------- # Part 2: Prove the fix blocks injection # --------------------------------------------------------------------------- class TestFixBlocksInjection: """Verify that patched min()/max() reject all injection attempts.""" def test_max_rejects_subquery(self, patched_qs): """The classic attack vector: subquery injection.""" with pytest.raises(ColumnValidationError, match="Invalid column name"): patched_qs.max("(SELECT email FROM users)") def test_min_rejects_subquery(self, patched_qs): with pytest.raises(ColumnValidationError, match="Invalid column name"): patched_qs.min("(SELECT password_hash FROM users)") def test_rejects_union_injection(self, patched_qs): """UNION-based injection attempt.""" with pytest.raises(ColumnValidationError): patched_qs.max("price) UNION SELECT password_hash FROM users--") def test_rejects_semicolon_injection(self, patched_qs): """Statement termination attack.""" with pytest.raises(ColumnValidationError): patched_qs.max("price; DROP TABLE users;--") def test_rejects_comment_injection(self, patched_qs): """SQL comment-based injection.""" with pytest.raises(ColumnValidationError): patched_qs.max("price/**/OR/**/1=1") def test_rejects_nonexistent_field(self, patched_qs): """Even normal-looking but nonexistent fields are rejected.""" with pytest.raises(ColumnValidationError, match="Unknown column"): patched_qs.max("nonexistent_field") def test_rejects_empty_string(self, patched_qs): with pytest.raises(ColumnValidationError): patched_qs.max("") def test_rejects_spaces_in_column(self, patched_qs): with pytest.raises(ColumnValidationError): patched_qs.max("price OR 1=1") def test_all_aggregate_methods_protected(self, patched_qs): """Every aggregate method should validate — not just min/max.""" payload = "(SELECT 1 FROM users)" for method_name in ["min", "max", "sum", "avg"]: method = getattr(patched_qs, method_name) with pytest.raises(ColumnValidationError): method(payload) # --------------------------------------------------------------------------- # Part 3: Normal operations still work # --------------------------------------------------------------------------- class TestNormalOperations: """Ensure the fix doesn't break legitimate queries.""" def test_max_price(self, patched_qs): result = patched_qs.max("price") assert result == 399.99 # 4K Monitor def test_min_price(self, patched_qs): result = patched_qs.min("price") assert result == 29.99 # Wireless Mouse def test_sum_stock(self, patched_qs): result = patched_qs.sum("stock") assert result == 555 # 150 + 75 + 200 + 30 + 100 def test_avg_price(self, patched_qs): result = patched_qs.avg("price") assert round(result, 2) == 127.99 # Average of all prices def test_max_stock(self, patched_qs): result = patched_qs.max("stock") assert result == 200 # USB-C Hub def test_min_stock(self, patched_qs): result = patched_qs.min("stock") assert result == 30 # 4K Monitor # --------------------------------------------------------------------------- # Part 4: Edge cases # --------------------------------------------------------------------------- class TestEdgeCases: """Boundary conditions and tricky inputs.""" def test_column_with_ormar_prefix(self, db_path): """Ormar uses table__field notation for related fields.""" qs = PatchedQuerySet( db_path=db_path, table_name="products", model_fields=["id", "name", "price", "stock"], ) # The prefix should be stripped, and 'price' should be valid result = qs.max("products__price") assert result == 399.99 def test_sql_keyword_as_injection(self, patched_qs): """SQL keywords that aren't valid column names should be rejected.""" for keyword in ["SELECT", "DROP", "DELETE", "INSERT", "UPDATE"]: with pytest.raises(ColumnValidationError, match="Unknown column"): patched_qs.max(keyword) def test_numeric_string_rejected(self, patched_qs): """Pure numeric input should fail the identifier regex.""" with pytest.raises(ColumnValidationError): patched_qs.max("12345") def test_special_chars_rejected(self, patched_qs): """Special characters that could break SQL should be rejected.""" for char in ["'", '"', ";", "(", ")", "-", "/", "*", "\\", "\n"]: with pytest.raises(ColumnValidationError): patched_qs.max(f"price{char}")