{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Engine Portability & Scale\n", "\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LakeLogic/LakeLogic/blob/main/examples/colab/03_engine_scale.ipynb) [![View on GitHub](https://img.shields.io/badge/github-view_source-black?logo=github)](https://github.com/lakelogic/LakeLogic/blob/main/examples/colab/03_engine_scale.ipynb)\n", "\n", "Write once, run anywhere. Same contract, different engines, identical results — plus dimensional modeling, incremental processing, parallel execution, backfill, and external logic hooks." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import subprocess\n", "import sys\n", "import importlib\n", "import urllib.request\n", "import os\n", "\n", "if importlib.util.find_spec(\"lakelogic\") is None:\n", " subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"--upgrade\", \"-q\", \"lakelogic[polars]\"])\n", "if importlib.util.find_spec(\"duckdb\") is None:\n", " subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"-q\", \"duckdb\"])\n", "if not os.path.exists(\"_setup.py\"):\n", " urllib.request.urlretrieve(\n", " \"https://raw.githubusercontent.com/LakeLogic/LakeLogic/main/examples/colab/_setup.py\", \"_setup.py\"\n", " )\n", "import _setup as s\n", "import lakelogic as ll" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 1. Engine-Agnostic Proof — Zero Code Changes\n", "\n", "**The Problem:** You built your pipeline on Polars/DuckDB. Now it needs to run on Spark in production. Rewriting 2,000 lines of DataFrame logic isn't a weekend project.\n", "\n", "**The Solution:** LakeLogic compiles SQL-first rules to each engine's dialect. Same contract, same results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import shutil\n", "\n", "# ── Clean slate: remove stale files from previous runs ────────────\n", "for _f in [\"03_engine_scale_demo/engine_test.yaml\", \"engine_test_source.parquet\"]:\n", " if os.path.exists(_f):\n", " os.remove(_f)\n", "\n", "contract = s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: engine_test\n", "\n", "model:\n", " fields:\n", " - name: id\n", " type: integer\n", " required: true\n", " - name: email\n", " type: string\n", " required: true\n", " - name: score\n", " type: integer\n", "\n", "quality:\n", " row_rules:\n", " - name: valid_email\n", " sql: \"email LIKE '%@%.%'\"\n", " - name: score_range\n", " sql: \"score BETWEEN 0 AND 100\"\n", "\"\"\",\n", " \"03_engine_scale_demo/engine_test.yaml\",\n", ")\n", "\n", "source_df = ll.DataGenerator(contract).generate(rows=500, invalid_ratio=0.08)\n", "source_file = \"engine_test_source.parquet\"\n", "source_df.write_parquet(source_file)\n", "\n", "# Run on Polars\n", "p1 = ll.DataProcessor(contract, engine=\"polars\")\n", "r1 = p1.run(source_df)\n", "\n", "# Run on DuckDB — same contract, zero changes\n", "p2 = ll.DataProcessor(contract, engine=\"duckdb\")\n", "r2 = p2.run(source_df)\n", "\n", "# Run on Spark — same contract, zero changes\n", "p3 = ll.DataProcessor(contract, engine=\"spark\")\n", "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder.appName(\"LakeLogic\").getOrCreate()\n", "spark_df = spark.read.parquet(source_file)\n", "r3 = p3.run(spark_df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# The Proof\n", "print(\"Engine Comparison\")\n", "print(\"=\" * 40)\n", "print(f\" Polars : good={r1.good_count}, bad={r1.bad_count}\")\n", "print(f\" DuckDB : good={r2.good_count}, bad={r2.bad_count}\")\n", "print(f\" Spark : good={r3.good_count}, bad={r3.bad_count}\")\n", "print(\n", " f\" Match : {r1.good_count == r2.good_count and r1.bad_count == r2.bad_count and r2.good_count == r3.good_count and r2.bad_count == r3.bad_count}\"\n", ")\n", "print(\"\\nSame contract. Same data. Same results. Zero code changes.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 2. Dimensional Modeling — SCD2, Merge, Overwrite\n", "\n", "**The Problem:** Your dimension table needs history tracking. You manually build `MERGE INTO` SQL, manage `effective_from`/`effective_to` dates, and debug `is_current` flags by hand.\n", "\n", "**The Solution:** Declare `materialization.strategy: scd2` in the contract — LakeLogic generates all SCD2 columns, applies merge logic, and manages version tracking. Zero SQL required." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import yaml\n", "\n", "# ── Show what a fully-declared dimensional contract looks like ────────\n", "scd2_yaml = \"\"\"\n", "version: 1.0.0\n", "dataset: dim_customers\n", "info:\n", " title: gold_dim_customers\n", " target_layer: gold\n", "\n", "primary_key: [customer_id]\n", "\n", "model:\n", " fields:\n", " - name: customer_id\n", " type: integer\n", " required: true\n", " - name: name\n", " type: string\n", " - name: email\n", " type: string\n", " - name: tier\n", " type: string\n", "\n", "materialization:\n", " strategy: scd2\n", " scd2:\n", " track_columns: [name, email, tier]\n", " timestamp_field: updated_at\n", " surrogate_key: _sk\n", " effective_from_field: effective_from\n", " effective_to_field: effective_to\n", " current_flag_field: is_current\n", " end_date_default: \"9999-12-31\"\n", " version_column: \"_version\" # ROW_NUMBER per business key\n", " change_reason_column: \"_change_reason\" # \"initial_load\", \"email,tier changes\", etc.\n", "\n", " # Unknown member (late-arriving fact fallback)\n", " unknown_member:\n", " enabled: true\n", " surrogate_key_value: \"-1\"\n", "\"\"\"\n", "\n", "# Parse & validate the contract\n", "from lakelogic.core.models import DataContract\n", "\n", "scd2_contract = ll.DataContract(**yaml.safe_load(scd2_yaml))\n", "\n", "print(\"\\u2500\" * 60)\n", "print(\"DIMENSIONAL MODELING: SCD2 Contract\")\n", "print(\"\\u2500\" * 60)\n", "print(f\" Strategy : {scd2_contract.materialization.strategy}\")\n", "print(f\" Primary key : {scd2_contract.primary_key}\")\n", "print(f\" Track columns : {scd2_contract.materialization.scd2['track_columns']}\")\n", "print(f\" Surrogate key : {scd2_contract.materialization.scd2['surrogate_key']}\")\n", "print(f\" Effective from : {scd2_contract.materialization.scd2['effective_from_field']}\")\n", "print(f\" Effective to : {scd2_contract.materialization.scd2['effective_to_field']}\")\n", "print(f\" Current flag : {scd2_contract.materialization.scd2['current_flag_field']}\")\n", "print(f\" version : {scd2_contract.materialization.scd2['version_column']}\")\n", "print(f\" change reason : {scd2_contract.materialization.scd2['change_reason_column']}\")\n", "print(f\" unknown_member : {scd2_contract.materialization.scd2['unknown_member']}\")\n", "\n", "\n", "# ── Show all supported strategies ─────────────────────────────────────\n", "strategies = {\n", " \"append\": \"Fact tables — new rows added, never updated\",\n", " \"merge\": \"SCD Type 1 — upsert by natural key, latest value wins\",\n", " \"scd2\": \"SCD Type 2 — full history with effective dates\",\n", " \"overwrite\": \"Periodic snapshot — drop & replace on each run\",\n", "}\n", "\n", "print(\"\\n\" + \"\\u2500\" * 60)\n", "print(\"ALL MATERIALIZATION STRATEGIES\")\n", "print(\"\\u2500\" * 60)\n", "for strat, desc in strategies.items():\n", " marker = \"\\u2716\" if strat == scd2_contract.materialization.strategy else \" \"\n", " print(f\" [{marker}] {strat:10s} — {desc}\")\n", "print(\"\\n\\u2705 All declared in YAML. No manual MERGE INTO SQL required.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### The Proof — Let's see SCD2 in action\n", "We'll generate 3 rows, run them through the processor, and see how LakeLogic automatically injects and populates the tracking columns without any SQL." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import glob\n", "import polars as pl\n", "\n", "# ── Clean slate: remove stale files from previous runs ────────────\n", "for _target in [\"03_engine_scale_demo/parallel_demo/gold_dim_customers\", \"03_engine_scale_demo/dim_customers.yaml\"]:\n", " if os.path.isdir(_target):\n", " shutil.rmtree(_target)\n", " elif os.path.isfile(_target):\n", " os.remove(_target)\n", "\n", "# Generate some initial data\n", "scd2_path = s.write_contract(scd2_yaml, \"03_engine_scale_demo/dim_customers.yaml\")\n", "scd2_source = ll.DataGenerator(scd2_path).generate(rows=3)\n", "print(\"1. INCOMING SOURCE DATA:\\n\")\n", "display(scd2_source)\n", "\n", "# Run the pipeline (LakeLogic automatically handles the SCD2 merge logic)\n", "p_scd2 = ll.DataProcessor(scd2_path, engine=\"polars\")\n", "os.makedirs(\"03_engine_scale_demo/parallel_demo/gold_dim_customers\", exist_ok=True)\n", "good_scd2, bad_scd2 = p_scd2.run(scd2_source, materialize=True, materialize_target=\"parallel_demo/gold_dim_customers\")\n", "\n", "print(\"\\n2. AFTER LAKELOGIC SCD2 PROCESSING (Notice the injected tracking columns):\\n\")\n", "try:\n", " materialized_df = pl.read_delta(\"03_engine_scale_demo/parallel_demo/gold_dim_customers\")\n", "except Exception:\n", " from pathlib import Path\n", "\n", " search_path = Path(\"03_engine_scale_demo/parallel_demo/gold_dim_customers\")\n", " parquet_files = [str(p) for p in search_path.rglob(\"*.parquet\")]\n", " if parquet_files:\n", " materialized_df = pl.read_parquet(parquet_files)\n", " else:\n", " materialized_df = good_scd2\n", "display(materialized_df.select([c for c in materialized_df.columns if not c.startswith(\"_lakelogic_\")]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 3. Incremental Processing — `pipeline_log` Watermark\n", "\n", "**The Problem:** Your nightly job reprocesses 10 million rows even though only 500 changed. Compute costs scale with total volume instead of change volume.\n", "\n", "**The Solution:** LakeLogic's `pipeline_log` watermark strategy tracks which files have been processed by their modification time. On the next run, only **new files** are loaded." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import shutil\n", "import polars as pl\n", "\n", "# ── Clean slate for demo ─────────────────────────────────────────────\n", "DEMO_DIR = \"./incremental_demo\"\n", "LANDING = f\"{DEMO_DIR}/landing\"\n", "\n", "if os.path.exists(DEMO_DIR):\n", " shutil.rmtree(DEMO_DIR)\n", "os.makedirs(LANDING, exist_ok=True)\n", "\n", "# ── Contract with source.type = landing, load_mode = incremental ────\n", "inc_contract = s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: orders\n", "info:\n", " title: bronze_orders\n", " target_layer: bronze\n", "\n", "source:\n", " type: landing\n", " path: \"./incremental_demo/landing\"\n", " format: ndjson\n", " load_mode: incremental\n", " watermark_strategy: pipeline_log\n", "\n", "metadata:\n", " run_log_dir: \"./incremental_demo/logs\"\n", "\n", "model:\n", " fields:\n", " - name: order_id\n", " type: integer\n", " required: true\n", " - name: amount\n", " type: float\n", " - name: status\n", " type: string\n", "\n", "quality:\n", " row_rules:\n", " - name: positive_amount\n", " sql: \"amount > 0\"\n", "\"\"\",\n", " \"03_engine_scale_demo/orders_inc.yaml\",\n", ")\n", "\n", "# ── FILE 1: 50 orders land in the landing zone ───────────────────────\n", "batch1 = ll.DataGenerator(inc_contract).generate(rows=50)\n", "batch1.write_ndjson(f\"{LANDING}/orders_batch_1.json\")\n", "print(f\"\\u2705 File 1: wrote {len(batch1)} rows to orders_batch_1.json\")\n", "\n", "# ── RUN 1: Initial load (no prior watermark) ────────────────────────\n", "proc = ll.DataProcessor(inc_contract, engine=\"polars\")\n", "g1, b1 = proc.run_source()\n", "r1 = proc.last_report\n", "\n", "print(f\"\\n{'=' * 50}\")\n", "print(\"RUN 1 (initial load)\")\n", "print(f\"{'=' * 50}\")\n", "print(f\" Files in landing : {len(os.listdir(LANDING))}\")\n", "print(f\" Rows loaded : {r1.get('counts', {}).get('source', '?')}\")\n", "print(f\" Good / Bad : {r1.get('counts', {}).get('good', '?')} / {r1.get('counts', {}).get('quarantined', '?')}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "\n", "time.sleep(1) # Ensure mtime of File 2 is strictly after Run 1's watermark\n", "\n", "# ── FILE 2: 20 new orders arrive ────────────────────────────────────\n", "batch2 = ll.DataGenerator(inc_contract).generate(rows=20)\n", "batch2.write_ndjson(f\"{LANDING}/orders_batch_2.json\")\n", "print(f\"\\u2705 File 2: wrote {len(batch2)} rows to orders_batch_2.json\")\n", "print(f\" Landing zone now has: {os.listdir(LANDING)}\")\n", "\n", "# ── RUN 2: Only new files processed ─────────────────────────────────\n", "g2, b2 = proc.run_source()\n", "r2 = proc.last_report\n", "\n", "print(f\"\\n{'=' * 50}\")\n", "print(\"RUN 2 (incremental)\")\n", "print(f\"{'=' * 50}\")\n", "print(f\" Files in landing : {len(os.listdir(LANDING))} (70 total rows across 2 files)\")\n", "print(\" Files processed : 1 (only orders_batch_2.json \\u2014 batch_1 already processed)\")\n", "print(f\" Rows loaded : {r2.get('counts', {}).get('source', '?')}\")\n", "print(f\" Good / Bad : {r2.get('counts', {}).get('good', '?')} / {r2.get('counts', {}).get('quarantined', '?')}\")\n", "print(\"\\n\\u2705 pipeline_log watermark: only new files are processed. No reprocessing.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 4. Parallel Processing — Concurrent Multi-Contract Execution\n", "\n", "**The Problem:** You have 8 Bronze contracts with no dependencies between them. Running them sequentially takes 40 minutes.\n", "\n", "**The Solution:** `pipeline.run(parallel=True)` groups contracts into dependency **waves** using topological sort. Contracts within the same wave execute concurrently via threads — layer ordering is preserved automatically." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import shutil\n", "import yaml\n", "from lakelogic.core.registry import DomainRegistry\n", "from lakelogic.pipeline import LakehousePipeline\n", "from IPython.display import HTML, display\n", "\n", "# ── Create inline contracts ──────────────────────────────────────────\n", "DAG_DIR = \"./parallel_demo\"\n", "\n", "# ── Clean slate: remove stale files from previous runs ────────────\n", "if os.path.exists(DAG_DIR):\n", " shutil.rmtree(DAG_DIR)\n", "\n", "os.makedirs(f\"{DAG_DIR}/contracts/bronze\", exist_ok=True)\n", "os.makedirs(f\"{DAG_DIR}/contracts/silver\", exist_ok=True)\n", "\n", "# Bronze: orders (independent)\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: orders\n", "info:\n", " title: bronze_orders\n", " target_layer: bronze\n", "source:\n", " type: landing\n", " path: \"./parallel_demo/landing/orders\"\n", " format: ndjson\n", "model:\n", " fields:\n", " - name: order_id\n", " type: integer\n", " required: true\n", " - name: amount\n", " type: float\n", "\"\"\",\n", " f\"{DAG_DIR}/contracts/bronze/orders.yaml\",\n", ")\n", "\n", "# Bronze: customers (independent — runs in parallel with orders)\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: customers\n", "info:\n", " title: bronze_customers\n", " target_layer: bronze\n", "source:\n", " type: landing\n", " path: \"./parallel_demo/landing/customers\"\n", " format: ndjson\n", "model:\n", " fields:\n", " - name: customer_id\n", " type: integer\n", " required: true\n", " - name: name\n", " type: string\n", " - name: email_address\n", " type: string\n", " pii: true\n", "\"\"\",\n", " f\"{DAG_DIR}/contracts/bronze/customers.yaml\",\n", ")\n", "\n", "# Bronze: products (independent — runs in parallel with orders & customers)\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: products\n", "info:\n", " title: bronze_products\n", " target_layer: bronze\n", "source:\n", " type: landing\n", " path: \"./parallel_demo/landing/products\"\n", " format: ndjson\n", "model:\n", " fields:\n", " - name: product_id\n", " type: integer\n", " required: true\n", " - name: name\n", " type: string\n", "\"\"\",\n", " f\"{DAG_DIR}/contracts/bronze/products.yaml\",\n", ")\n", "\n", "# Silver: customers_enriched\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: customers_enriched\n", "info:\n", " title: silver_customers_enriched\n", " target_layer: silver\n", "source:\n", " type: table\n", " path: \"./parallel_demo/lakehouse/bronze/bronze_customers\"\n", "model:\n", " fields:\n", " - name: customer_id\n", " type: integer\n", " required: true\n", " - name: name\n", " type: string\n", " - name: email_address\n", " type: string\n", " pii: true\n", "\n", "\"\"\",\n", " f\"{DAG_DIR}/contracts/silver/customers_enriched.yaml\",\n", ")\n", "\n", "# Silver: orders_enriched (depends on orders + customers → runs AFTER them)\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: orders_enriched\n", "info:\n", " title: silver_orders_enriched\n", " target_layer: silver\n", "source:\n", " type: table\n", " path: \"./parallel_demo/lakehouse/bronze/bronze_orders\"\n", "model:\n", " fields:\n", " - name: order_id\n", " type: integer\n", " required: true\n", " - name: amount\n", " type: float\n", "downstream:\n", " - type: dashboard\n", " name: \"Weekly Sales Performance\"\n", " platform: power_bi\n", " url: \"https://app.powerbi.com/...\"\n", " owner: \"marketing-analytics\"\n", "\n", " - type: api\n", " name: \"Order Tracking Service\"\n", " platform: internal\n", " owner: \"backend-team\"\n", "\n", "\"\"\",\n", " f\"{DAG_DIR}/contracts/silver/orders_enriched.yaml\",\n", ")\n", "\n", "# ── Create _system.yaml with dependency declarations ────────────────\n", "system_yaml = {\n", " \"domain\": \"demo\",\n", " \"system\": \"ecommerce\",\n", " # ── External sources (for lineage visualization) ──────────────────────────\n", " \"external_sources\": [\n", " {\n", " \"name\": \"Shopify API\",\n", " \"source_domain\": \"CRM Vendor\",\n", " \"catalog_path\": \"external_storage_path_or_api\",\n", " \"consumed_by\": [\"orders\", \"customers\"],\n", " },\n", " {\n", " \"name\": \"Products System Database\",\n", " \"source_domain\": \"Products Vendor\",\n", " \"catalog_path\": \"external_storage_path_or_api\",\n", " \"consumed_by\": [\"products\"],\n", " },\n", " ],\n", " \"contracts\": [\n", " {\"layer\": \"bronze\", \"entity\": \"orders\", \"path\": \"contracts/bronze/orders.yaml\", \"enabled\": True},\n", " {\"layer\": \"bronze\", \"entity\": \"customers\", \"path\": \"contracts/bronze/customers.yaml\", \"enabled\": True},\n", " {\"layer\": \"bronze\", \"entity\": \"products\", \"path\": \"contracts/bronze/products.yaml\", \"enabled\": True},\n", " {\n", " \"layer\": \"silver\",\n", " \"entity\": \"customers_enriched\",\n", " \"path\": \"contracts/silver/customers_enriched.yaml\",\n", " \"enabled\": True,\n", " },\n", " {\n", " \"layer\": \"silver\",\n", " \"entity\": \"orders_enriched\",\n", " \"path\": \"contracts/silver/orders_enriched.yaml\",\n", " \"depends_on\": [\"customers_enriched\"],\n", " \"enabled\": True,\n", " },\n", " ],\n", " \"environments\": {\n", " \"local\": {\n", " \"catalog\": \"local\",\n", " \"storage_root\": \"./parallel_demo/lakehouse\",\n", " \"data_root\": \"./parallel_demo/lakehouse\",\n", " \"quarantine_root\": \"./parallel_demo/lakehouse/_quarantine\",\n", " }\n", " },\n", " \"storage\": {\"external_location_root\": \"./parallel_demo/lakehouse\"},\n", "}\n", "\n", "sys_path = f\"{DAG_DIR}/_system.yaml\"\n", "with open(sys_path, \"w\") as f:\n", " yaml.dump(system_yaml, f, default_flow_style=False)\n", "\n", "# ── Build pipeline ──────────────────────────────────────────────────\n", "registry = DomainRegistry.from_yaml(sys_path, environment=\"local\", storage_mode=\"direct\")\n", "pipeline = LakehousePipeline(registry, engine=\"polars\")\n", "\n", "# ── Visualise the DAG — shows parallel waves ────────────────────────\n", "display(HTML(pipeline.visualize_dag()))\n", "\n", "# ── Show wave grouping ──────────────────────────────────────────────\n", "from lakelogic.pipeline.runner import LakehousePipeline as _LP\n", "\n", "bronze_contracts = [c for c in registry.contracts if c.layer == \"bronze\"]\n", "waves = _LP._group_by_dependency_level(bronze_contracts)\n", "\n", "print(\"\\n\" + \"\\u2500\" * 60)\n", "print(\"PARALLEL EXECUTION PLAN\")\n", "print(\"\\u2500\" * 60)\n", "print(f\" Bronze layer: {len(bronze_contracts)} contracts\")\n", "for i, wave in enumerate(waves):\n", " entities = [c.entity for c in wave]\n", " print(f\" Wave {i}: [{', '.join(entities)}] \\u2190 {'parallel' if len(entities) > 1 else 'sequential'}\")\n", "\n", "print(\"\\n Silver layer: orders_enriched\")\n", "print(\" \\u2514\\u2500 depends_on: [orders, customers] \\u2192 waits for Bronze to complete\")\n", "\n", "print(\"\\n\\u2705 pipeline.run(parallel=True) executes Wave 0 contracts concurrently.\")\n", "print(\" Layer ordering (bronze \\u2192 silver \\u2192 gold) is always preserved.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 5. Backfill & Reprocessing — Targeted Late-Arriving Data\n", "\n", "**The Problem:** A partner sent corrected data for last Tuesday. You need to reload just those records without blowing away the rest of the week.\n", "\n", "**The Solution:** `run_source(reprocess_from=..., reprocess_to=...)` lets you surgically reload a date range or specific IDs — the incremental watermark is bypassed for that run only." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import shutil\n", "from datetime import date, timedelta\n", "\n", "# ── Clean slate ─────────────────────────────────────────────────────\n", "BF_DIR = \"./backfill_demo\"\n", "BF_LANDING = f\"{BF_DIR}/landing\"\n", "if os.path.exists(BF_DIR):\n", " shutil.rmtree(BF_DIR)\n", "os.makedirs(BF_LANDING, exist_ok=True)\n", "\n", "# ── Contract with source.type = landing + reprocess column ─────────\n", "backfill_contract = s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: daily_events\n", "info:\n", " title: bronze_daily_events\n", " target_layer: bronze\n", "\n", "source:\n", " type: landing\n", " path: \"./backfill_demo/landing/*.ndjson\"\n", " format: ndjson\n", "\n", "model:\n", " fields:\n", " - name: event_id\n", " type: integer\n", " required: true\n", " - name: event_date\n", " type: string\n", " required: true\n", " - name: payload\n", " type: string\n", "\n", "materialization:\n", " reprocess_date_column: event_date\n", "\n", "quality:\n", " row_rules:\n", " - name: has_payload\n", " sql: \"payload IS NOT NULL\"\n", "\"\"\",\n", " \"03_engine_scale_demo/daily_events.yaml\",\n", ")\n", "\n", "# ── Generate a week of events and write to landing ────────────────\n", "today = date.today()\n", "rows = []\n", "for i in range(7):\n", " day = (today - timedelta(days=6 - i)).isoformat()\n", " for j in range(50):\n", " rows.append({\"event_id\": i * 50 + j, \"event_date\": day, \"payload\": f\"data_{i}_{j}\"})\n", "\n", "full_week = pl.DataFrame(rows)\n", "full_week.write_ndjson(f\"{BF_LANDING}/events_full_week.ndjson\")\n", "print(f\"Full dataset: {len(full_week)} rows across 7 days\")\n", "print(full_week.group_by(\"event_date\").len().sort(\"event_date\"))\n", "\n", "# ── Full load first ──────────────────────────────────────────────────\n", "bf_proc = ll.DataProcessor(backfill_contract, engine=\"polars\")\n", "g_full, b_full = bf_proc.run_source()\n", "r_full = bf_proc.last_report\n", "print(f\"\\nFull load: {r_full.get('counts', {}).get('source', '?')} rows\")\n", "\n", "# ── Targeted backfill: reload just 2 days ──────────────────────────\n", "target_start = (today - timedelta(days=3)).isoformat()\n", "target_end = (today - timedelta(days=2)).isoformat()\n", "\n", "g_bp, b_bp = bf_proc.run_source(\n", " reprocess_from=target_start,\n", " reprocess_to=target_end,\n", ")\n", "r_bp = bf_proc.last_report\n", "\n", "print(f\"\\n{'=' * 50}\")\n", "print(f\"BACKFILL: {target_start} to {target_end}\")\n", "print(f\"{'=' * 50}\")\n", "print(f\" Rows reprocessed : {r_bp.get('counts', {}).get('source', '?')}\")\n", "print(f\" Good / Bad : {r_bp.get('counts', {}).get('good', '?')} / {r_bp.get('counts', {}).get('bad', '?')}\")\n", "print(\"\\n\\u2705 Only the targeted date range was reprocessed \\u2014 rest of the week untouched.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 6. External Logic — Custom Python Hooks\n", "\n", "**The Problem:** Your Gold-layer transformation requires 200 lines of business logic — joins, pivots, ML scoring — that won't fit in a SQL rule.\n", "\n", "**The Solution:** Declare `external_logic` in the contract. LakeLogic calls your custom Python function, feeds it the validated DataFrame, and then applies its own quality rules and lineage to the result." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import shutil\n", "import yaml\n", "import polars as pl\n", "\n", "# ── Clean slate ─────────────────────────────────────────────────────\n", "EXT_DIR = \"03_engine_scale_demo/external_logic_demo\"\n", "if os.path.exists(EXT_DIR):\n", " shutil.rmtree(EXT_DIR)\n", "os.makedirs(f\"{EXT_DIR}/transforms\", exist_ok=True)\n", "\n", "# ── Step 1: Write the custom Python transform to disk ────────────\n", "transform_code = \"\"\"\n", "import polars as pl\n", "\n", "def run(df, *, fiscal_year=2026, include_refunds=False, **kwargs):\n", " \\\"\\\"\\\"Gold-layer aggregation - called by LakeLogic.\\\"\\\"\\\"\\n\n", " # Filter out refunds if requested\n", " if not include_refunds:\n", " df = df.filter(pl.col(\"status\") != \"refunded\")\n", " return df.group_by(\"region\").agg(\n", " pl.col(\"amount\").sum().alias(\"total_revenue\"),\n", " pl.col(\"order_id\").count().alias(\"order_count\"),\n", " )\n", "\"\"\"\n", "\n", "transform_path = f\"{EXT_DIR}/transforms/revenue_summary.py\"\n", "with open(transform_path, \"w\", encoding=\"utf-8\") as f:\n", " f.write(transform_code.strip() + \"\\n\")\n", "\n", "print(\"transforms/revenue_summary.py\")\n", "print(\"─\" * 60)\n", "print(transform_code.strip())\n", "print(\"─\" * 60)\n", "\n", "# ── Step 2: Create the contract referencing the script ────────────\n", "ext_yaml = \"\"\"\n", "version: 1.0.0\n", "dataset: gold_revenue_summary\n", "info:\n", " title: gold_revenue_summary\n", " target_layer: gold\n", "\n", "model:\n", " fields:\n", " - name: region\n", " type: string\n", " required: true\n", " - name: total_revenue\n", " type: float\n", " - name: order_count\n", " type: integer\n", "\n", "external_logic:\n", " type: python\n", " path: \"transforms/revenue_summary.py\"\n", " entrypoint: run\n", " args:\n", " fiscal_year: 2026\n", " include_refunds: false\n", "\n", "quality:\n", " row_rules:\n", " - name: positive_revenue\n", " sql: \"total_revenue >= 0\"\n", "\"\"\"\n", "\n", "ext_contract_path = s.write_contract(ext_yaml, f\"{EXT_DIR}/gold_revenue_summary.yaml\")\n", "\n", "# ── Step 3: Generate realistic source data ────────────────────────\n", "import random\n", "\n", "random.seed(42)\n", "\n", "regions = [\"EMEA\", \"APAC\", \"Americas\", \"LATAM\"]\n", "statuses = [\"completed\", \"completed\", \"completed\", \"refunded\", \"pending\"]\n", "\n", "source_data = pl.DataFrame(\n", " {\n", " \"order_id\": list(range(1, 201)),\n", " \"region\": [random.choice(regions) for _ in range(200)],\n", " \"amount\": [round(random.uniform(10, 500), 2) for _ in range(200)],\n", " \"status\": [random.choice(statuses) for _ in range(200)],\n", " }\n", ")\n", "\n", "print(\"\\n1. SOURCE DATA (200 orders across 4 regions):\\n\")\n", "display(source_data.head(5))\n", "print(f\" ... {len(source_data)} total rows\")\n", "\n", "# ── Step 4: Run the pipeline — LakeLogic calls your script ───────\n", "proc = ll.DataProcessor(ext_contract_path, engine=\"polars\")\n", "result = proc.run(source_data)\n", "\n", "print(\"\\n2. AFTER EXTERNAL LOGIC (your script aggregated by region):\\n\")\n", "display(result.good)\n", "\n", "print(\"\\n\" + \"─\" * 60)\n", "print(\"What happened:\")\n", "print(\" 1. LakeLogic validated 200 rows against the contract schema\")\n", "print(\" 2. Called transforms/revenue_summary.py → run(df, fiscal_year=2026)\")\n", "print(\" 3. Your script filtered refunds + aggregated by region\")\n", "print(\" 4. LakeLogic applied quality rules (positive_revenue >= 0)\")\n", "print(\" 5. Lineage metadata injected automatically\")\n", "print(\"\\n✅ Your custom logic. LakeLogic's quality rules + lineage still apply.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What You Just Saw\n", "\n", "| # | Feature | How |\n", "|---|---------|-----|\n", "| 1 | **Engine portability** | Same contract on Polars and DuckDB, identical results |\n", "| 2 | **Dimensional modeling** | `strategy: scd2` — full history tracking declared in YAML |\n", "| 3 | **Incremental processing** | `pipeline_log` watermark — only new files are loaded |\n", "| 4 | **Parallel processing** | `pipeline.run(parallel=True)` — concurrent wave execution |\n", "| 5 | **Backfill & reprocessing** | `reprocess_from`/`reprocess_to` — surgical date-range reload |\n", "| 6 | **External logic** | `external_logic.type: python` — custom transforms with full lineage |" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Go Deeper — Explore by Capability\n", "\n", "Each notebook below maps to a pillar of LakeLogic's [Technical Capabilities](https://lakelogic.github.io/LakeLogic/#technical-capabilities):\n", "\n", "| # | Notebook | What You'll See |\n", "|---|---|---|\n", "| 🛡️ | **[Data Quality & Trust](01_data_quality_trust.ipynb)** | Reconciliation proofs, Pydantic validation, SQL-first rules, SLO monitoring |\n", "| 📜 | **[Compliance & Governance](02_compliance_governance.ipynb)** | GDPR erasure in 2 lines, automatic lineage, cost intelligence |\n", "| ⚡ | **[Engine & Scale](03_engine_scale.ipynb)** | Same contract on Polars & DuckDB, incremental processing, dry run |\n", "| 🔧 | **[Developer Experience](04_developer_experience.ipynb)** | Structured diagnostics, DDL generation, surgical resets, multi-channel alerts |\n", "| 🧬 | **[Data Generation & AI](05_data_generation_ai.ipynb)** | Synthetic data, referential integrity, edge case injection, contract inference |\n", "| 🔌 | **[Integrations](06_integrations.ipynb)** | dbt adapter, dlt sources, contract-driven quality gates on arrival |\n", "\n", "> **Each notebook is self-contained** — pick the capability that matters most to you and run it independently." ] } ], "metadata": { "colab": { "name": "LakeLogic — Engine & Scale", "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }