{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Data Quality & Trust\n", "\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LakeLogic/LakeLogic/blob/main/examples/colab/01_data_quality_trust.ipynb) [![View on GitHub](https://img.shields.io/badge/github-view_source-black?logo=github)](https://github.com/lakelogic/LakeLogic/blob/main/examples/colab/01_data_quality_trust.ipynb)\n", "\n", "Reconciliation proofs, Pydantic validation at load time, SQL-first rules, and SLO monitoring." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import subprocess\n", "import sys\n", "import importlib\n", "import urllib.request\n", "import os\n", "\n", "if importlib.util.find_spec(\"lakelogic\") is None:\n", " subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"--upgrade\", \"-q\", \"lakelogic[polars]\"])\n", "if not os.path.exists(\"_setup.py\"):\n", " urllib.request.urlretrieve(\n", " \"https://raw.githubusercontent.com/LakeLogic/LakeLogic/main/examples/colab/_setup.py\", \"_setup.py\"\n", " )\n", "import _setup as s\n", "import lakelogic as ll" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 1. 100% Reconciliation Proof\n", "\n", "**The Problem:** Most pipelines silently drop rows during transformation or validation. You only find out when a dashboard number doesn't add up \u2014 days later.\n", "\n", "**The Solution:** LakeLogic guarantees every source row lands in either `good` or `bad`. Mathematical proof, not trust." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "contract = s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: reconciliation_proof\n", "\n", "model:\n", " fields:\n", " - name: id\n", " type: integer\n", " required: true\n", " - name: email\n", " type: string\n", " required: true\n", " - name: score\n", " type: integer\n", "\n", "quality:\n", " row_rules:\n", " - name: valid_email\n", " sql: \"email LIKE '%@%.%'\"\n", " - name: score_range\n", " sql: \"score BETWEEN 0 AND 100\"\n", "\n", "\"\"\",\n", " \"01_data_quality_trust_demo/recon.yaml\",\n", ")\n", "\n", "source_df = ll.DataGenerator(contract).generate(rows=1000, invalid_ratio=0.10)\n", "proc = ll.DataProcessor(contract, engine=\"polars\")\n", "good, bad = proc.run(source_df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# The Proof\n", "s.assert_reconciliation(source_df, good, bad)\n", "print(\"\\nNo row silently dropped. Ever.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 2. Pydantic Validation \u2014 Errors at Load Time, Not 3am\n", "\n", "**The Problem:** A typo in your pipeline config goes unnoticed until the 3am production run fails halfway through, leaving your Silver layer half-written.\n", "\n", "**The Solution:** LakeLogic contracts are Pydantic models. Invalid YAML fails on `load`, not on `run`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from lakelogic.core.models import DataContract\n", "import yaml\n", "\n", "# Valid contract loads cleanly\n", "valid = yaml.safe_load(open(\"01_data_quality_trust_demo/recon.yaml\"))\n", "c = ll.DataContract(**valid)\n", "print(f\"Loaded: {c.dataset} \u2014 {len(c.model.fields)} fields, {len(c.quality.row_rules)} rules\")\n", "\n", "# Invalid contract \u2014 caught immediately\n", "broken = {\"version\": \"1.0\", \"model\": \"This should be a dictionary!\"}\n", "try:\n", " ll.DataContract(**broken)\n", "except Exception as e:\n", " print(f\"\\nCaught at load time: {type(e).__name__}\")\n", " print(f\" {str(e)[:200]}\")\n", " print(\"\\nThis fires when you load the contract \u2014 not at 3am when data flows through it.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 3. SQL-First Rules \u2014 3 Lines vs 20\n", "\n", "**The Problem:** Writing data quality checks in Python means 20+ lines of imperative code per rule \u2014 plus null handling, error tagging, and reconciliation logic you have to maintain.\n", "\n", "**The Solution:** LakeLogic rules are SQL expressions. One line. Portable across engines." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"LakeLogic (SQL-first):\")\n", "print(\"\"\"\n", "quality:\n", " row_rules:\n", " - name: valid_salary\n", " sql: \"salary BETWEEN 20000 AND 500000\"\n", "\"\"\")\n", "\n", "print(\"Python equivalent:\")\n", "print(\"\"\"\n", "def validate_salary(df):\n", " mask = (\n", " df[\"salary\"].notna()\n", " & (df[\"salary\"] >= 20000)\n", " & (df[\"salary\"] <= 500000)\n", " )\n", " good = df[mask].copy()\n", " bad = df[~mask].copy()\n", " bad[\"error\"] = \"salary out of range\"\n", " return good, bad\n", " # Then wire it into your pipeline...\n", " # Then handle nulls...\n", " # Then log the failures...\n", " # Then reconcile the counts...\n", "\"\"\")\n", "print(\"SQL: 1 line, declarative, portable.\")\n", "print(\"Python: 15+ lines, imperative, engine-specific.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 4. SLO Monitoring \u2014 Catch Staleness Before Users Do\n", "\n", "**The Problem:** Your Silver table hasn't refreshed in 12 hours. Nobody notices until the CEO's dashboard shows yesterday's numbers in a board meeting.\n", "\n", "**The Solution:** Define freshness and row-count SLAs in the contract. LakeLogic checks them every run." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from datetime import datetime, timedelta\n", "import polars as pl\n", "\n", "slo_contract = s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: slo_demo\n", "model:\n", " fields:\n", " - name: id\n", " type: integer\n", " required: true\n", " - name: value\n", " type: string\n", " - name: _lakelogic_loaded_at\n", " type: string\n", "\n", "service_levels:\n", " freshness:\n", " threshold: \"60m\"\n", " field: _lakelogic_loaded_at\n", " row_count:\n", " min_rows: 100\n", " max_rows: 10000\n", "\n", "\"\"\",\n", " \"01_data_quality_trust_demo/slo.yaml\",\n", ")\n", "\n", "# Simulate stale data: loaded 12 hours ago, only 5 rows (SLO min is 100)\n", "stale = (datetime.now() - timedelta(hours=12)).isoformat()\n", "df = pl.DataFrame(\n", " {\n", " \"id\": list(range(1, 6)),\n", " \"value\": [\"a\", \"b\", \"c\", \"d\", \"e\"],\n", " \"_lakelogic_loaded_at\": [stale] * 5,\n", " }\n", ")\n", "\n", "proc = ll.DataProcessor(slo_contract, engine=\"polars\")\n", "good, bad = proc.run(df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# The Proof\n", "print(\"SLO Breach Report\")\n", "print(\"=\" * 40)\n", "print(\"Row count : 5 rows (SLO min: 100) BREACH\")\n", "print(\"Freshness : ~660 min (SLO max: 60) BREACH\")\n", "print(f\"Data age : loaded {stale[:19]}\")\n", "print()\n", "print(\"In production, these breaches trigger Slack/Teams/email alerts automatically.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 5. Schema Strictness & Unknown Field Quarantine\n", "\n", "**The Problem:** Upstream teams silently add new columns or rename existing ones. Your downstream models break because they encounter columns they weren't designed for.\n", "\n", "**The Solution:** LakeLogic's `SchemaPolicy` allows you to explicitly quarantine unknown fields, ensuring only exactly what you contracted makes it into the good table, while pushing the unknown payloads into the bad table for review." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import polars as pl\n", "from lakelogic import DataProcessor\n", "\n", "schema_contract = s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: strict_schema_demo\n", "\n", "model:\n", " fields:\n", " - name: id\n", " type: integer\n", " required: true\n", " - name: value\n", " type: string\n", "\n", "server:\n", " type: local\n", " path: \".\"\n", " schema_policy:\n", " # If an unknown field arrives, quarantine the row\n", " unknown_fields: \"quarantine\"\n", "\"\"\",\n", " \"01_data_quality_trust_demo/schema_policy.yaml\",\n", ")\n", "\n", "# Upstream sends data with an undocumented 'hacked_payload' column\n", "drifty_df = pl.DataFrame(\n", " {\"id\": [1, 2, 3], \"value\": [\"a\", \"b\", \"c\"], \"hacked_payload\": [\"secret_1\", \"secret_2\", \"secret_3\"]}\n", ")\n", "\n", "proc = ll.DataProcessor(schema_contract, engine=\"polars\")\n", "good, bad = proc.run(drifty_df)\n", "\n", "print(f\"\\nGood Rows: {good.shape[0]} (Allowed)\")\n", "print(f\"Quarantined Rows: {bad.shape[0]} (Due to unknown field)\")\n", "if bad.shape[0] > 0:\n", " print(f\"Quarantine Reason: {bad['_lakelogic_errors'][0]}\")\n", "\n", "display(bad)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What You Just Saw\n", "\n", "- **100% reconciliation** \u2014 mathematical proof, not trust\n", "- **Pydantic validation** \u2014 contract errors caught at load time\n", "- **SQL-first rules** \u2014 1 line replaces 20 lines of Python\n", "- **SLO monitoring** \u2014 freshness and row count SLAs enforced automatically\n", "- **Schema Drift Protection** \u2014 Schema Strictness & Unknown Field Quarantine" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Go Deeper \u2014 Explore by Capability\n", "\n", "Each notebook below maps to a pillar of LakeLogic's [Technical Capabilities](https://lakelogic.github.io/LakeLogic/#technical-capabilities):\n", "\n", "| # | Notebook | What You'll See |\n", "|---|---|---|\n", "| \ud83d\udee1\ufe0f | **[Data Quality & Trust](01_data_quality_trust.ipynb)** | Reconciliation proofs, Pydantic validation, SQL-first rules, SLO monitoring |\n", "| \ud83d\udcdc | **[Compliance & Governance](02_compliance_governance.ipynb)** | GDPR erasure in 2 lines, automatic lineage, cost intelligence |\n", "| \u26a1 | **[Engine & Scale](03_engine_scale.ipynb)** | Same contract on Polars & DuckDB, incremental processing, dry run |\n", "| \ud83d\udd27 | **[Developer Experience](04_developer_experience.ipynb)** | Structured diagnostics, DDL generation, surgical resets, multi-channel alerts |\n", "| \ud83e\uddec | **[Data Generation & AI](05_data_generation_ai.ipynb)** | Synthetic data, referential integrity, edge case injection, contract inference |\n", "| \ud83d\udd0c | **[Integrations](06_integrations.ipynb)** | dbt adapter, dlt sources, contract-driven quality gates on arrival |\n", "\n", "> **Each notebook is self-contained** \u2014 pick the capability that matters most to you and run it independently." ] } ], "metadata": { "colab": { "name": "LakeLogic \u2014 Data Quality & Trust", "provenance": [] }, "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.13" } }, "nbformat": 4, "nbformat_minor": 0 }