{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Developer Experience\n", "\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LakeLogic/LakeLogic/blob/main/examples/colab/04_developer_experience.ipynb) [![View on GitHub](https://img.shields.io/badge/github-view_source-black?logo=github)](https://github.com/lakelogic/LakeLogic/blob/main/examples/colab/04_developer_experience.ipynb)\n", "\n", "Structured diagnostics, DDL generation, DAG visualization, dry run previews, surgical resets, and multi-channel alerts — all from the contract." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import subprocess\n", "import sys\n", "import importlib\n", "import urllib.request\n", "import os\n", "\n", "if importlib.util.find_spec(\"lakelogic\") is None:\n", " subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"--upgrade\", \"-q\", \"lakelogic[polars]\"])\n", "if not os.path.exists(\"_setup.py\"):\n", " urllib.request.urlretrieve(\n", " \"https://raw.githubusercontent.com/LakeLogic/LakeLogic/main/examples/colab/_setup.py\", \"_setup.py\"\n", " )\n", "import _setup as s\n", "import lakelogic as ll" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 1. Structured Diagnostics — Powered by `loguru`\n", "\n", "**The Problem:** Your pipeline failed. The log says `ERROR: validation failed`. No contract name, no run ID, no timestamp with timezone. You grep through 50 log files.\n", "\n", "**The Solution:** LakeLogic uses `loguru` for structured logging. Every line includes precise timestamps, severity levels, the exact function path, and execution tags — drastically cutting troubleshooting time." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# LakeLogic uses loguru out of the box — no logging config needed.\n", "# Just run a processor and observe the structured log output.\n", "\n", "contract = s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: diagnostics_demo\n", "model:\n", " fields:\n", " - name: id\n", " type: integer\n", " required: true\n", " - name: value\n", " type: string\n", "quality:\n", " row_rules:\n", " - name: has_value\n", " sql: \"value IS NOT NULL AND value != ''\"\n", "\"\"\",\n", " \"04_developer_experience_demo/diag.yaml\",\n", ")\n", "\n", "proc = ll.DataProcessor(contract, engine=\"polars\")\n", "source_df = ll.DataGenerator(contract).generate(rows=50, invalid_ratio=0.1)\n", "good, bad = proc.run(source_df)\n", "\n", "# ── Observe: every log line has timestamp | level | module:function:line ──\n", "print(f\"\\nResult: good={len(good)}, bad={len(bad)}\")\n", "print(\"\\n\\u2705 Every log line above includes:\")\n", "print(\" • ISO timestamp with timezone\")\n", "print(\" • Severity level (INFO, WARNING, ERROR)\")\n", "print(\" • Module path: lakelogic.core.processor:run:788\")\n", "print(\" • Run metrics: Source count, Good/Quarantine split, ratio\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 2. DDL-Only Mode — Schema Migration Without Running a Pipeline\n", "\n", "**The Problem:** You need to create the target table schema before the first pipeline run, but you don't want to process any data yet.\n", "\n", "**The Solution:** `DataProcessor.generate_ddl()` generates CREATE TABLE DDL directly from the contract — perfect for CI/CD migrations." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import shutil\n", "import yaml\n", "import polars as pl\n", "from lakelogic.core.registry import DomainRegistry\n", "from lakelogic.pipeline import LakehousePipeline\n", "\n", "# ── Clean slate ──────────────────────────────────────────────────────\n", "DDL_DIR = \"./ddl_demo\"\n", "if os.path.exists(DDL_DIR):\n", " shutil.rmtree(DDL_DIR)\n", "os.makedirs(f\"{DDL_DIR}/contracts/bronze\", exist_ok=True)\n", "os.makedirs(f\"{DDL_DIR}/landing/events\", exist_ok=True)\n", "\n", "# ── Contract ─────────────────────────────────────────────────────────\n", "ddl_contract = s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: user_events\n", "info:\n", " title: bronze_user_events\n", " target_layer: bronze\n", "source:\n", " type: landing\n", " path: \"./ddl_demo/landing/events\"\n", " format: ndjson\n", "model:\n", " fields:\n", " - name: event_id\n", " type: integer\n", " required: true\n", " - name: user_id\n", " type: string\n", " required: true\n", " - name: event_type\n", " type: string\n", " - name: payload\n", " type: string\n", " - name: created_at\n", " type: string\n", " required: true\n", "\"\"\",\n", " f\"{DDL_DIR}/contracts/bronze/user_events_v1.0.yaml\",\n", ")\n", "\n", "# ── Preview: show what DDL LakeLogic generates per backend ────────\n", "proc = ll.DataProcessor(ddl_contract, engine=\"polars\")\n", "print(\"DuckDB DDL:\")\n", "print(proc.generate_ddl(backend=\"duckdb\"))\n", "print(\"\\nSpark DDL:\")\n", "print(proc.generate_ddl(backend=\"spark\"))\n", "\n", "# ── _system.yaml registry ───────────────────────────────────────────\n", "system_yaml = {\n", " \"domain\": \"demo\",\n", " \"system\": \"app\",\n", " \"contracts\": [\n", " {\n", " \"layer\": \"bronze\",\n", " \"entity\": \"user_events\",\n", " \"path\": \"contracts/bronze/user_events_v1.0.yaml\",\n", " \"enabled\": True,\n", " },\n", " ],\n", " \"environments\": {\n", " \"local\": {\n", " \"catalog\": \"local\",\n", " \"storage_root\": \"./ddl_demo/lakehouse\",\n", " \"data_root\": \"./ddl_demo/lakehouse\",\n", " \"quarantine_root\": \"./ddl_demo/lakehouse/_quarantine\",\n", " }\n", " },\n", " \"storage\": {\n", " \"external_location_root\": \"./ddl_demo/lakehouse\",\n", " },\n", "}\n", "\n", "system_path = f\"{DDL_DIR}/_system.yaml\"\n", "with open(system_path, \"w\") as f:\n", " yaml.dump(system_yaml, f, default_flow_style=False)\n", "\n", "# ── Initialize Pipeline & run DDL-only ────────────────────────────\n", "registry = DomainRegistry.from_yaml(system_path, environment=\"local\", storage_mode=\"direct\")\n", "ddl_pipeline = LakehousePipeline(registry, engine=\"polars\")\n", "\n", "summary = ddl_pipeline.run(\n", " target_layers=\"bronze\",\n", " reset_layers=\"\",\n", " reload_layers=\"\",\n", " dry_run=False,\n", " entity_filter=\"\",\n", " environment=\"local\",\n", " parallel=False,\n", " max_workers=1,\n", " ddl_only=True, # ← CREATE TABLES ONLY, NO DATA PROCESSING\n", " created_by=\"ddl_migration_demo\",\n", ")\n", "\n", "print(\"\\n\" + \"=\" * 60)\n", "print(\"DDL-ONLY RUN SUMMARY\")\n", "print(\"=\" * 60)\n", "print(f\" Pipeline run : {summary.run_id}\")\n", "print(\" DDL only : True\")\n", "print()\n", "for r in summary.results:\n", " table_name = r.get(\"table_name\", r.get(\"contract\", \"?\"))\n", " layer = r.get(\"layer\", \"?\")\n", " status = r.get(\"status\", \"?\")\n", " print(f\" [{layer:6s}] {table_name:30s} → {status}\")\n", "print(\"=\" * 60)\n", "print(\"\\n✅ Tables created from contract — no data was processed.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 3. DAG Dependency Viewer — Execution Order at a Glance\n", "\n", "**The Problem:** You have 15 contracts with dependencies. Running them in the wrong order corrupts downstream tables.\n", "\n", "**The Solution:** `LakehousePipeline.visualize_dag()` renders the full dependency graph from your `_system.yaml` registry — showing bronze → silver → gold flow and execution order." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import yaml\n", "from lakelogic.core.registry import DomainRegistry\n", "from lakelogic.pipeline import LakehousePipeline\n", "from IPython.display import HTML, display\n", "\n", "# ── Create inline contracts ──────────────────────────────────────────\n", "DAG_DIR = \"./dag_demo\"\n", "os.makedirs(f\"{DAG_DIR}/contracts/bronze\", exist_ok=True)\n", "os.makedirs(f\"{DAG_DIR}/contracts/silver\", exist_ok=True)\n", "\n", "# Bronze: orders (raw landing)\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: orders\n", "info:\n", " title: bronze_demo_orders\n", " target_layer: bronze\n", "source:\n", " type: landing\n", " path: \"./dag_demo/landing/orders\"\n", " format: ndjson\n", "model:\n", " fields:\n", " - name: order_id\n", " type: integer\n", " required: true\n", " - name: amount\n", " type: float\n", "\"\"\",\n", " f\"{DAG_DIR}/contracts/bronze/bronze_demo_orders_v1.0.yaml\",\n", ")\n", "\n", "# Bronze: customers (raw landing)\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: customers\n", "info:\n", " title: bronze_demo_customers\n", " target_layer: bronze\n", "source:\n", " type: landing\n", " path: \"./dag_demo/landing/customers\"\n", " format: ndjson\n", "model:\n", " fields:\n", " - name: customer_id\n", " type: integer\n", " required: true\n", " - name: name\n", " type: string\n", " pii: true\n", "\"\"\",\n", " f\"{DAG_DIR}/contracts/bronze/bronze_demo_customers_v1.0.yaml\",\n", ")\n", "\n", "# Silver: customers_cleaned (depends on bronze customers + customers)\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: customers_cleaned\n", "info:\n", " title: silver_demo_customers_cleaned\n", " target_layer: silver\n", "source:\n", " type: table\n", " path: \"./dag_demo/lakehouse/bronze/bronze_demo_customers\"\n", "model:\n", " fields:\n", " - name: customer_id\n", " type: integer\n", " required: true\n", " - name: name\n", " type: string\n", " pii: true\n", "\n", "\"\"\",\n", " f\"{DAG_DIR}/contracts/silver/silver_demo_customers_v1.0.yaml\",\n", ")\n", "\n", "# Silver: orders_cleaned (depends on bronze orders + customers)\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: orders_cleaned\n", "info:\n", " title: silver_demo_orders_cleaned\n", " target_layer: silver\n", "source:\n", " type: table\n", " path: \"./dag_demo/lakehouse/bronze/bronze_demo_orders\"\n", "model:\n", " fields:\n", " - name: order_id\n", " type: integer\n", " required: true\n", " - name: amount\n", " type: float\n", "\n", "downstream:\n", " - type: dashboard\n", " name: \"Weekly Sales Performance\"\n", " platform: power_bi\n", " url: \"https://app.powerbi.com/...\"\n", " owner: \"marketing-analytics\"\n", "\n", "\"\"\",\n", " f\"{DAG_DIR}/contracts/silver/silver_demo_orders_v1.0.yaml\",\n", ")\n", "\n", "print(\"\\u2705 Inline contracts created\")\n", "\n", "# ── Create inline _system.yaml registry ──────────────────────────────\n", "system_yaml = {\n", " \"domain\": \"demo\",\n", " \"system\": \"ecommerce\",\n", " \"external_sources\": [\n", " {\n", " \"name\": \"ecommerce demo API\",\n", " \"source_domain\": \"ecommerce Vendor\",\n", " \"catalog_path\": \"external_storage_path_or_api\",\n", " \"consumed_by\": [\"orders\", \"customers\"],\n", " }\n", " ],\n", " \"contracts\": [\n", " {\n", " \"layer\": \"bronze\",\n", " \"entity\": \"orders\",\n", " \"path\": \"contracts/bronze/bronze_demo_orders_v1.0.yaml\",\n", " \"enabled\": True,\n", " },\n", " {\n", " \"layer\": \"bronze\",\n", " \"entity\": \"customers\",\n", " \"path\": \"contracts/bronze/bronze_demo_customers_v1.0.yaml\",\n", " \"enabled\": True,\n", " },\n", " {\n", " \"layer\": \"silver\",\n", " \"entity\": \"customers_cleaned\",\n", " \"path\": \"contracts/silver/silver_demo_customers_v1.0.yaml\",\n", " \"enabled\": True,\n", " },\n", " {\n", " \"layer\": \"silver\",\n", " \"entity\": \"orders_cleaned\",\n", " \"path\": \"contracts/silver/silver_demo_orders_v1.0.yaml\",\n", " \"depends_on\": [\"customers_cleaned\"],\n", " \"enabled\": True,\n", " },\n", " ],\n", " \"environments\": {\n", " \"local\": {\n", " \"catalog\": \"local\",\n", " \"storage_root\": \"./dag_demo/lakehouse\",\n", " \"data_root\": \"./dag_demo/lakehouse\",\n", " \"quarantine_root\": \"./dag_demo/lakehouse/_quarantine\",\n", " }\n", " },\n", " \"storage\": {\n", " \"external_location_root\": \"./dag_demo/lakehouse\",\n", " \"log_path\": \"./dag_demo/lakehouse/_logs\",\n", " },\n", " \"materialization\": {\n", " \"bronze\": {\n", " \"strategy\": \"append\",\n", " \"format\": \"delta\",\n", " },\n", " \"silver\": {\n", " \"strategy\": \"merge\",\n", " \"format\": \"delta\",\n", " },\n", " },\n", "}\n", "\n", "system_path = f\"{DAG_DIR}/_system.yaml\"\n", "with open(system_path, \"w\") as f:\n", " yaml.dump(system_yaml, f, default_flow_style=False)\n", "print(f\"\\u2705 _system.yaml written to {system_path}\")\n", "\n", "# ── Build pipeline and visualize ─────────────────────────────────────\n", "registry = DomainRegistry.from_yaml(system_path, environment=\"local\", storage_mode=\"direct\")\n", "pipeline = LakehousePipeline(registry, engine=\"polars\")\n", "\n", "display(HTML(pipeline.visualize_dag()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 4. Dry Run Mode — Preview Before You Commit\n", "\n", "**The Problem:** You changed a transformation and want to see the execution plan before it touches production data.\n", "\n", "**The Solution:** Run with `dry_run=True`. The pipeline walks every contract in topological order and logs what it **would** execute — but **skips all processing and writes**." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Dry Run: uses the same inline pipeline from Section 3 ───────────\n", "summary = pipeline.run(\n", " target_layers=\"bronze,silver\",\n", " reset_layers=\"\",\n", " reload_layers=\"\",\n", " dry_run=True, # <--- PREVIEW ONLY\n", " entity_filter=\"\",\n", " environment=\"local\",\n", " parallel=False,\n", " max_workers=4,\n", " ddl_only=False,\n", " created_by=\"developer_experience_demo\",\n", " reprocess_from=None,\n", " reprocess_to=None,\n", " reprocess_column=None,\n", " reprocess_values=None,\n", " retry_attempts=3,\n", " retry_base_wait_seconds=2,\n", " entity_timeout_minutes=60,\n", " max_consecutive_failures=2,\n", ")\n", "\n", "print(\"\\n\" + \"=\" * 60)\n", "print(\"DRY RUN SUMMARY\")\n", "print(\"=\" * 60)\n", "print(f\" Pipeline run : {summary.run_id}\")\n", "print(f\" Environment : {summary.environment}\")\n", "print(f\" Dry run : {summary.dry_run}\")\n", "print()\n", "for r in summary.results:\n", " contract = r.get(\"contract\", \"?\")\n", " layer = r.get(\"layer\", \"?\")\n", " status = r.get(\"status\", \"?\")\n", " print(f\" [{layer:6s}] {contract:25s} -> {status}\")\n", "print(\"=\" * 60)\n", "print(\"\\n✅ Every contract was evaluated but nothing was processed or written.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 5. Surgical Reset & Reload — Silver Only, Bronze Untouched\n", "\n", "**The Problem:** Your Silver transformation logic had a bug. You need to re-run Silver without reprocessing Bronze (which took 4 hours).\n", "\n", "**The Solution:** Reset and reload just the Silver layer. Bronze output stays untouched — simply re-feed Bronze data into the fixed Silver processor." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import shutil\n", "import yaml\n", "from lakelogic.core.registry import DomainRegistry\n", "from lakelogic.pipeline import LakehousePipeline\n", "\n", "# ── Clean Slate ──────────────────────────────────────────────────────\n", "SUR_DIR = \"./sur_demo\"\n", "if os.path.exists(SUR_DIR):\n", " shutil.rmtree(SUR_DIR, ignore_errors=True)\n", "os.makedirs(f\"{SUR_DIR}/contracts/bronze\", exist_ok=True)\n", "os.makedirs(f\"{SUR_DIR}/contracts/silver\", exist_ok=True)\n", "\n", "# ── Bronze Contract: orders ──────────────────────────────────────────\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: orders\n", "info:\n", " title: bronze_sur_orders\n", " table_name: bronze_sur_orders\n", " target_layer: bronze\n", "source:\n", " type: landing\n", " path: \"./sur_demo/landing/orders\"\n", " format: ndjson\n", "model:\n", " fields:\n", " - name: order_id\n", " type: integer\n", " required: true\n", " - name: amount\n", " type: float\n", "\"\"\",\n", " f\"{SUR_DIR}/contracts/bronze/bronze_sur_orders_v1.0.yaml\",\n", ")\n", "\n", "# ── Silver Contract: orders_cleaned ──────────────────────────────────\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: orders_cleaned\n", "info:\n", " title: silver_sur_orders_cleaned\n", " table_name: silver_sur_orders_cleaned\n", " target_layer: silver\n", "source:\n", " type: table\n", " path: \"./sur_demo/lakehouse/bronze_sur_orders\"\n", "model:\n", " fields:\n", " - name: order_id\n", " type: integer\n", " required: true\n", " - name: amount\n", " type: float\n", "\"\"\",\n", " f\"{SUR_DIR}/contracts/silver/silver_sur_orders_cleaned_v1.0.yaml\",\n", ")\n", "\n", "print(\"✅ Contracts created\")\n", "\n", "# ── _system.yaml ─────────────────────────────────────────────────────\n", "system_yaml = {\n", " \"domain\": \"demo\",\n", " \"system\": \"surgical\",\n", " \"storage\": {\n", " \"external_location_root\": \"./sur_demo/lakehouse\",\n", " \"log_path\": \"./sur_demo/lakehouse/_logs\",\n", " },\n", " \"materialization\": {\n", " \"bronze\": {\n", " \"strategy\": \"append\",\n", " \"format\": \"delta\",\n", " },\n", " \"silver\": {\n", " \"strategy\": \"merge\",\n", " \"format\": \"delta\",\n", " },\n", " },\n", " \"contracts\": [\n", " {\n", " \"layer\": \"bronze\",\n", " \"entity\": \"orders\",\n", " \"path\": \"contracts/bronze/bronze_sur_orders_v1.0.yaml\",\n", " \"enabled\": True,\n", " },\n", " {\n", " \"layer\": \"silver\",\n", " \"entity\": \"orders_cleaned\",\n", " \"path\": \"contracts/silver/silver_sur_orders_cleaned_v1.0.yaml\",\n", " \"depends_on\": [\"orders\"],\n", " \"enabled\": True,\n", " },\n", " ],\n", " \"environments\": {\n", " \"local\": {\n", " \"catalog\": \"local\",\n", " \"storage_root\": \"./sur_demo/lakehouse\",\n", " \"data_root\": \"./sur_demo/lakehouse\",\n", " \"quarantine_root\": \"./sur_demo/lakehouse/_quarantine\",\n", " }\n", " },\n", "}\n", "\n", "system_path = f\"{SUR_DIR}/_system.yaml\"\n", "with open(system_path, \"w\") as f:\n", " yaml.dump(system_yaml, f, default_flow_style=False)\n", "print(f\"✅ _system.yaml written to {system_path}\")\n", "\n", "# ── Build Pipeline ───────────────────────────────────────────────────\n", "registry = DomainRegistry.from_yaml(system_path, environment=\"local\", storage_mode=\"direct\")\n", "sur_pipeline = LakehousePipeline(registry, engine=\"polars\")\n", "\n", "# ── Generate Landing Data ────────────────────────────────────────────\n", "os.makedirs(\"./sur_demo/landing/orders\", exist_ok=True)\n", "pl.DataFrame({\"order_id\": [1, 2, 3], \"amount\": [10.5, 20.0, 35.0]}).write_ndjson(\n", " \"./sur_demo/landing/orders/data.ndjson\"\n", ")\n", "print(\"✅ Landing data generated\")\n", "\n", "# ── RUN 1: Full Pipeline (Bronze + Silver) ───────────────────────────\n", "print(\"\\n── RUN 1: Full Pipeline (Bronze → Silver) ──\")\n", "summary_1 = sur_pipeline.run(\n", " target_layers=\"bronze,silver\",\n", " reset_layers=\"\",\n", " reload_layers=\"\",\n", " dry_run=False,\n", " entity_filter=\"\",\n", " environment=\"local\",\n", " parallel=False,\n", " max_workers=1,\n", " ddl_only=False,\n", " created_by=\"surgical_reset_demo\",\n", " retry_attempts=1,\n", " retry_base_wait_seconds=2,\n", " entity_timeout_minutes=60,\n", " max_consecutive_failures=2,\n", ")\n", "print(f\"✅ Run 1 done: {[r['contract'] for r in summary_1.results if r['status'] == 'success']}\")\n", "\n", "# ── RUN 2: Surgical Reset & Reload (Silver Only) ────────────────────\n", "# Scenario: you fixed a transformation bug. You need Silver re-processed\n", "# from the existing Bronze output — WITHOUT rerunning Bronze.\n", "print(\"\\n── RUN 2: Surgical Reset & Reload (Silver Only) ──\")\n", "summary_2 = sur_pipeline.run(\n", " target_layers=\"silver\", # ← TARGET ONLY SILVER\n", " reset_layers=\"silver\", # ← WIPE SILVER TARGETS\n", " reload_layers=\"silver\", # ← IGNORE WATERMARKS\n", " dry_run=False,\n", " entity_filter=\"\",\n", " environment=\"local\",\n", " parallel=False,\n", " max_workers=1,\n", " ddl_only=False,\n", " created_by=\"surgical_reset_demo\",\n", " retry_attempts=1,\n", " retry_base_wait_seconds=2,\n", " entity_timeout_minutes=60,\n", " max_consecutive_failures=2,\n", ")\n", "print(f\"✅ Run 2 done: {[r['contract'] for r in summary_2.results if r['status'] == 'success']}\")\n", "\n", "print(\"\\n✅ Silver was wiped and re-processed from Bronze output.\")\n", "print(\"✅ Bronze layer was completely untouched.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 6. Multi-Channel Alerts — Slack, Teams, Email, Webhooks\n", "\n", "**The Problem:** A quality breach fires at 2am. Nobody sees the email until 9am. Seven hours of bad data in production.\n", "\n", "**The Solution:** LakeLogic’s `proc.notify()` dispatches alerts to Slack, Teams, email, or any webhook — powered by Apprise with Jinja2 template support. The notification config lives in the contract.\n", "\n", "### Real Slack Alerts from LakeLogic\n", "\n", "| SLO Breach Alert | Quarantine Alert |\n", "|:---:|:---:|\n", "| ![SLO Breach](assets/slack_slo_alert.png) | ![Quarantine Alert](assets/slack_quarantine_alert.png) |\n", "\n", "> **These are real notifications** fired by LakeLogic during a pipeline run. The contract defines which channels receive which event types." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Local Testing (Dry Run) \n", "\n", "LakeLogic includes a `type: \"console\"` notification adapter that renders payloads directly to stdout. This is perfect for local development and notebook testing, allowing you to preview how templates render without configuring external credentials." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# == Notifications in Action (Console / Local Testing) ==\n", "# LakeLogic includes a `console` notification adapter that simply prints\n", "# the rendered notification payload to standard output.\n", "# This is perfect for verifying notification payloads locally.\n", "\n", "import os\n", "import shutil\n", "import yaml\n", "import polars as pl\n", "from lakelogic.core.registry import DomainRegistry\n", "from lakelogic.pipeline import LakehousePipeline\n", "\n", "NOTIFY_DIR = \"./notify_demo\"\n", "if os.path.exists(NOTIFY_DIR):\n", " shutil.rmtree(NOTIFY_DIR, ignore_errors=True)\n", "\n", "os.makedirs(f\"{NOTIFY_DIR}/contracts/bronze\", exist_ok=True)\n", "\n", "# 1. Bronze Contract with a quality rule that will quarantine some rows\n", "s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: notification_test\n", "info:\n", " title: bronze_notify_test\n", " table_name: bronze_notify_test\n", " target_layer: bronze\n", "source:\n", " type: landing\n", " path: \"./notify_demo/landing/data\"\n", " format: ndjson\n", "model:\n", " fields:\n", " - name: id\n", " type: integer\n", " required: true\n", "quality:\n", " row_rules:\n", " - name: positive_id\n", " sql: \"id > 1\"\n", "quarantine:\n", " enabled: true\n", "\"\"\",\n", " f\"{NOTIFY_DIR}/contracts/bronze/bronze_notify_test_v1.0.yaml\",\n", ")\n", "\n", "# 2. System registry with console notifications configured\n", "system_yaml = {\n", " \"domain\": \"demo\",\n", " \"system\": \"notifications\",\n", " \"storage\": {\n", " \"external_location_root\": \"./notify_demo/lakehouse\",\n", " },\n", " \"materialization\": {\n", " \"bronze\": {\"strategy\": \"append\", \"format\": \"delta\"},\n", " },\n", " \"notifications\": [{\"type\": \"console\", \"on_events\": [\"quarantine\", \"failure\"]}],\n", " \"contracts\": [\n", " {\n", " \"layer\": \"bronze\",\n", " \"entity\": \"notification_test\",\n", " \"path\": \"contracts/bronze/bronze_notify_test_v1.0.yaml\",\n", " \"enabled\": True,\n", " },\n", " ],\n", " \"environments\": {\n", " \"local\": {\n", " \"catalog\": \"local\",\n", " \"storage_root\": \"./notify_demo/lakehouse\",\n", " }\n", " },\n", "}\n", "\n", "system_path = f\"{NOTIFY_DIR}/_system.yaml\"\n", "with open(system_path, \"w\") as f:\n", " yaml.dump(system_yaml, f, default_flow_style=False)\n", "\n", "# 3. Generate landing data (id=1 will be quarantined by the positive_id rule)\n", "os.makedirs(f\"{NOTIFY_DIR}/landing/data\", exist_ok=True)\n", "pl.DataFrame({\"id\": [1, 2, 3]}).write_ndjson(f\"{NOTIFY_DIR}/landing/data/data.ndjson\")\n", "\n", "# 4. Run Pipeline -- quarantine fires the console notification\n", "registry = DomainRegistry.from_yaml(system_path, environment=\"local\", storage_mode=\"direct\")\n", "pipeline = LakehousePipeline(registry, engine=\"polars\")\n", "\n", "print(\"\\n-- Running Pipeline with Console Notifications --\")\n", "summary = pipeline.run(\n", " target_layers=\"bronze\",\n", " environment=\"local\",\n", " dry_run=False,\n", ")\n", "print(\"\\n[OK] Note the [LAKELOGIC NOTIFICATION] blocks printed to the console above!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Contract with notification config ────────────────────────────────\n", "# In production, these URLs would be real webhook endpoints.\n", "# For this demo, we show what the notification system produces.\n", "\n", "alert_yaml = \"\"\"\n", "version: 1.0.0\n", "dataset: alert_demo\n", "\n", "ownership:\n", " contacts:\n", " - name: Oncall Engineer\n", " role: owner\n", " email: oncall@company.com\n", "\n", "model:\n", " fields:\n", " - name: metric_id\n", " type: integer\n", " required: true\n", " - name: value\n", " type: float\n", "\n", "quality:\n", " row_rules:\n", " - name: positive_value\n", " sql: \"value > 0\"\n", "\n", "quarantine:\n", " enabled: true\n", " notifications:\n", " - type: slack\n", " target: https://hooks.slack.com/services/T00/B00/demo\n", " on_events: [failure, slo_breach, quarantine]\n", " - type: teams\n", " target: https://outlook.webhook.office.com/demo\n", " on_events: [failure]\n", " - type: webhook\n", " target: https://api.pagerduty.com/v2/enqueue\n", " on_events: [slo_breach]\n", "\"\"\"\n", "\n", "print(\"\\u2500\" * 60)\n", "print(\"NOTIFICATION CONFIG (from contract)\")\n", "print(\"\\u2500\" * 60)\n", "print(alert_yaml.strip())\n", "\n", "# ── Simulate what the notification payload looks like ───────────────\n", "import json\n", "from datetime import datetime, timezone\n", "\n", "payload = {\n", " \"event\": \"dataset_quality_check\",\n", " \"subject\": \"[LOCAL] demo/ecommerce: Dataset Quality Check Alert\",\n", " \"message\": \"alert_demo: quarantine rate 12% exceeds SLO threshold of 5%\",\n", " \"run_id\": \"abc-1234-def-5678\",\n", " \"timestamp\": datetime.now(timezone.utc).isoformat(),\n", " \"contract\": \"alert_demo v1.0.0\",\n", " \"engine\": \"polars\",\n", " \"channels\": [\"slack\", \"teams\", \"pagerduty\"],\n", "}\n", "\n", "print(\"\\n\" + \"\\u2500\" * 60)\n", "print(\"NOTIFICATION PAYLOAD (what Slack/Teams/Webhooks receive)\")\n", "print(\"\\u2500\" * 60)\n", "print(json.dumps(payload, indent=2))\n", "\n", "print(\"\\n\\u2705 One contract config → Slack + Teams + PagerDuty simultaneously.\")\n", "print(\" In production, proc.notify('failure', 'message') dispatches to all channels.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What You Just Saw\n", "\n", "- **Structured diagnostics** — `loguru` powers every log line with timestamps, severity, and function paths\n", "- **DDL generation** — `proc.generate_ddl()` creates CREATE TABLE DDL for any backend from the contract\n", "- **DAG viewer** — `pipeline.visualize_dag()` renders the full dependency graph from `_system.yaml`\n", "- **Dry run mode** — `pipeline.run(dry_run=True)` previews execution without processing or writing\n", "- **Surgical reset** — reload Silver without re-ingesting Bronze\n", "- **Multi-channel alerts** — Slack, Teams, email, webhooks from one contract config block" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Go Deeper — Explore by Capability\n", "\n", "Each notebook below maps to a pillar of LakeLogic's [Technical Capabilities](https://lakelogic.github.io/LakeLogic/#technical-capabilities):\n", "\n", "| # | Notebook | What You'll See |\n", "|---|---|---|\n", "| 🛡️ | **[Data Quality & Trust](01_data_quality_trust.ipynb)** | Reconciliation proofs, Pydantic validation, SQL-first rules, SLO monitoring |\n", "| 📜 | **[Compliance & Governance](02_compliance_governance.ipynb)** | GDPR erasure in 2 lines, automatic lineage, cost intelligence |\n", "| ⚡ | **[Engine & Scale](03_engine_scale.ipynb)** | Same contract on Polars & DuckDB, incremental processing, backfill |\n", "| 🔧 | **[Developer Experience](04_developer_experience.ipynb)** | Diagnostics, DDL, DAG viewer, dry run, surgical resets, alerts |\n", "| 🧬 | **[Data Generation & AI](05_data_generation_ai.ipynb)** | Synthetic data, referential integrity, edge case injection, contract inference |\n", "| 🔌 | **[Integrations](06_integrations.ipynb)** | dbt adapter, dlt sources, contract-driven quality gates on arrival |\n", "\n", "> **Each notebook is self-contained** — pick the capability that matters most to you and run it independently." ] } ], "metadata": { "colab": { "name": "LakeLogic — Developer Experience", "provenance": [] }, "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.13" } }, "nbformat": 4, "nbformat_minor": 0 }