{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Integrations — dbt, dlt, Streaming, Databases, Cloud & More\n", "\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LakeLogic/LakeLogic/blob/main/examples/colab/06_integrations.ipynb) [![View on GitHub](https://img.shields.io/badge/github-view_source-black?logo=github)](https://github.com/lakelogic/LakeLogic/blob/main/examples/colab/06_integrations.ipynb)\n", "\n", "Reuse your dbt schema definitions, plug into 100+ dlt sources, stream live data from WebSockets/Kafka, and extract directly from SQL databases — all with contract-driven quality gates." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import subprocess\n", "import sys\n", "import importlib\n", "import urllib.request\n", "import os\n", "\n", "os.environ[\"LAKELOGIC_SKIP_INCREMENTAL_CHECK\"] = \"1\"\n", "\n", "if importlib.util.find_spec(\"lakelogic\") is None:\n", " subprocess.check_call(\n", " [\n", " sys.executable,\n", " \"-m\",\n", " \"pip\",\n", " \"install\",\n", " \"--upgrade\",\n", " \"-q\",\n", " \"lakelogic[polars]\",\n", " \"connectorx\",\n", " \"SQLAlchemy\",\n", " \"websocket-client\",\n", " ]\n", " )\n", "if not os.path.exists(\"_setup.py\"):\n", " urllib.request.urlretrieve(\n", " \"https://raw.githubusercontent.com/LakeLogic/LakeLogic/main/examples/colab/_setup.py\", \"_setup.py\"\n", " )\n", "import _setup as s\n", "import lakelogic as ll" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 1. dbt Adapter — Reuse Your Schema Definitions\n", "\n", "**The Problem:** You already have 200 models defined in dbt `schema.yml`. Rewriting them as LakeLogic contracts doubles your maintenance burden.\n", "\n", "**The Solution:** `DataProcessor.from_dbt()` reads your dbt schema and creates a LakeLogic contract from it. Zero duplication." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pathlib import Path\n", "\n", "# Write a realistic dbt schema.yml\n", "dbt_schema = \"\"\"\n", "version: 2\n", "models:\n", " - name: customers\n", " description: Customer master table\n", " columns:\n", " - name: customer_id\n", " description: Primary key\n", " tests:\n", " - not_null\n", " - unique\n", " - name: email\n", " description: Customer email address\n", " tests:\n", " - not_null\n", " - name: first_name\n", " description: First name\n", " - name: last_name\n", " description: Last name\n", " - name: country\n", " description: ISO country code\n", " tests:\n", " - accepted_values:\n", " values: ['US', 'GB', 'DE', 'FR', 'JP']\n", " - name: created_at\n", " description: Account creation timestamp\n", " tests:\n", " - not_null\n", "\"\"\"\n", "Path(\"dbt_schema.yml\").write_text(dbt_schema)\n", "\n", "# Create a LakeLogic processor from dbt definitions\n", "proc = ll.DataProcessor.from_dbt(\"dbt_schema.yml\", model=\"customers\")\n", "print(\"Contract created from dbt schema:\")\n", "print(f\" Dataset: {proc.contract.dataset}\")\n", "print(f\" Fields: {[f.name for f in proc.contract.model.fields]}\")\n", "print(f\" Rules: {len(proc.contract.quality.row_rules)} row rules\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# The Proof — generate data and run through the dbt-derived contract\n", "gen = ll.DataGenerator.from_dbt(\"dbt_schema.yml\", model=\"customers\")\n", "source_df = gen.generate(rows=500, invalid_ratio=0.08)\n", "\n", "good, bad = proc.run(source_df)\n", "s.assert_reconciliation(source_df, good, bad)\n", "print(\"\\ndbt not_null + accepted_values tests → LakeLogic quality rules. Zero rewrite.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 2. dlt Adapter — Contract-Driven API Ingestion\n", "\n", "**The Problem:** You ingest from GitHub, Stripe, Shopify and 100+ APIs via [dlt](https://dlthub.com). Data arrives with no schema enforcement — bad records flow straight into your warehouse.\n", "\n", "**The Solution:** Declare the API directly in your contract's `source.type: dlt` block. LakeLogic extracts the data via dlt's REST API engine, then validates every row through your model and quality rules — all in one `proc.run_source()` call." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Install dlt (if not already installed)\n", "import subprocess\n", "import sys\n", "\n", "try:\n", " import dlt\n", "except ImportError:\n", " subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"-q\", \"dlt\"])\n", " import dlt\n", "print(f\"dlt v{dlt.__version__} ready\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── The contract declares the API source directly ─────────────\n", "# No separate dlt script needed — the contract IS the config.\n", "github_contract = s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: github_issues\n", "info:\n", " title: bronze_github_issues\n", " domain: engineering\n", " target_layer: bronze\n", "\n", "source:\n", " type: dlt\n", " dlt:\n", " base_url: https://api.github.com\n", " credentials: {}\n", " endpoints:\n", " - name: issues\n", " path: repos/dlt-hub/dlt/issues\n", " params:\n", " state: open\n", " per_page: 30\n", "\n", "model:\n", " fields:\n", " - name: id\n", " type: integer\n", " required: true\n", " - name: repository_url\n", " type: string\n", " - name: number\n", " type: integer\n", " required: true\n", " - name: title\n", " type: string\n", " required: true\n", " - name: body\n", " type: string\n", " - name: state\n", " type: string\n", " - name: url\n", " type: string\n", " - name: created_at\n", " type: string\n", " - name: updated_at\n", " type: string\n", "\n", "quality:\n", " row_rules:\n", " - name: valid_state\n", " sql: \"state IN ('open', 'closed')\"\n", " - name: has_title\n", " sql: \"title IS NOT NULL AND title != ''\"\n", "\n", "server:\n", " type: local\n", " path: \".\"\n", " schema_policy:\n", " evolution: \"allow\"\n", " unknown_fields: \"drop\"\n", "\n", "\"\"\",\n", " \"06_integrations_demo/github_issues.yaml\",\n", ")\n", "\n", "print(\"Contract written with source.type=dlt\")\n", "print(\" API: https://api.github.com/repos/dlt-hub/dlt/issues\")\n", "print(\" Fields: id, repository_url, number, title, body, state, ...\")\n", "print(\" Rules: valid_state, has_title\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── One call: dlt extraction + LakeLogic validation ────────────\n", "# run_source() detects source.type=dlt and:\n", "# 1. Builds a dlt REST API pipeline from the contract config\n", "# 2. Extracts data from the GitHub API\n", "# 3. Converts to Polars DataFrame\n", "# 4. Runs schema validation + quality rules\n", "# 5. Returns good/bad split with reconciliation guarantee\n", "\n", "proc = ll.DataProcessor(github_contract, engine=\"polars\")\n", "good, bad = proc.run_source()\n", "r = proc.last_report\n", "\n", "counts = r.get(\"counts\", {})\n", "print(\"Contract-driven dlt results:\")\n", "print(f\" Source : {counts.get('source', '?')} issues from GitHub API\")\n", "print(f\" Good : {counts.get('good', '?')} (passed all rules)\")\n", "print(f\" Bad : {counts.get('quarantined', '?')} (quarantined)\")\n", "print(\n", " f\" Match : {counts.get('source', 0)} == {counts.get('good', 0)} + {counts.get('quarantined', 0)} -> {counts.get('source', 0) == counts.get('good', 0) + counts.get('quarantined', 0)}\"\n", ")\n", "print(\"\\nThe contract IS the config. No dlt script. No manual DataFrame wrangling.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"preview live github data - GOOD DATA\")\n", "good.limit(3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"preview live github data - BAD DATA\")\n", "bad.limit(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 3. Live Streaming Data — Native Connectors\n", "\n", "**The Problem:** Data arrives continuously from real-time feeds (crypto prices, wiki edits, clickstreams via Kafka). You need to validate every message against your schema before it lands in the lakehouse.\n", "\n", "**The Solution:** LakeLogic ships with native streaming connectors (`SSEConnector`, `WebSocketConnector`, `KafkaConnector`, `WebhookConnector`, and more). Each connector’s `.stream()` method yields JSON events which you buffer into a DataFrame and pipe directly through your contract. Below we use the **Binance Public REST API** to fetch live BTC trades, then validate them — the exact same pattern works with any connector." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Fetch live BTC trades using LakeLogic's WebSocketConnector ───\n", "from lakelogic.engines.streaming_connectors import WebSocketConnector\n", "import polars as pl\n", "\n", "connector = WebSocketConnector(url=\"wss://stream.binance.com:9443/ws/btcusdt@trade\")\n", "\n", "# Collect 20 live trades, then close the connection\n", "trades = []\n", "for event in connector.stream():\n", " trades.append(event)\n", " if len(trades) >= 20:\n", " break\n", "connector.close()\n", "\n", "live_df = pl.DataFrame(trades)\n", "print(f\"Captured {live_df.height} live BTC trades via WebSocketConnector\")\n", "display(live_df.head(3))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Define a contract for real-time trade validation ─────────\n", "# Binance WebSocket sends cryptic field names (e, E, s, t, p, q, T, m, M).\n", "# The contract uses a `pre` transformation to rename them to business-friendly names\n", "# BEFORE validation runs.\n", "stream_contract = s.write_contract(\n", " \"\"\"\n", "version: 1.0.0\n", "dataset: btc_trades\n", "info:\n", " title: bronze_btc_trades\n", " domain: market_data\n", " target_layer: bronze\n", "\n", "source:\n", " type: stream\n", " path: wss://stream.binance.com:9443/ws/btcusdt@trade\n", "\n", "transformations:\n", " - phase: pre\n", " rename:\n", " mappings:\n", " t: trade_id\n", " p: price\n", " q: quantity\n", " s: symbol\n", " T: trade_time\n", " m: is_buyer_maker\n", " e: event_type\n", " E: event_time\n", " M: is_best_match\n", "\n", "model:\n", " fields:\n", " - name: trade_id\n", " type: integer\n", " required: true\n", " - name: price\n", " type: string\n", " required: true\n", " - name: quantity\n", " type: string\n", " required: true\n", " - name: symbol\n", " type: string\n", " required: true\n", " - name: trade_time\n", " type: integer\n", " - name: is_buyer_maker\n", " type: boolean\n", " - name: event_type\n", " type: string\n", " - name: event_time\n", " type: integer\n", "\n", "quality:\n", " row_rules:\n", " - name: positive_price\n", " sql: \"CAST(price AS DOUBLE) > 0\"\n", " - name: positive_qty\n", " sql: \"CAST(quantity AS DOUBLE) > 0\"\n", " - name: valid_symbol\n", " sql: \"symbol = 'BTCUSDT'\"\n", "\n", "server:\n", " type: local\n", " path: \".\"\n", " schema_policy:\n", " evolution: \"allow\"\n", " unknown_fields: \"drop\"\n", "\"\"\",\n", " \"06_integrations_demo/btc_trades.yaml\",\n", ")\n", "\n", "print(\"Contract written with pre-validation rename transformations!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Validate live trades through the contract ───────────\n", "proc = ll.DataProcessor(stream_contract, engine=\"polars\")\n", "good, bad = proc.run(live_df)\n", "\n", "s.assert_reconciliation(live_df, good, bad)\n", "print(\"\\nLive BTC trades validated in real-time!\")\n", "print(f\" Good : {good.height} trades passed all rules\")\n", "print(f\" Bad : {bad.height} trades quarantined\")\n", "display(good.select([\"trade_id\", \"symbol\", \"price\", \"quantity\", \"is_buyer_maker\"]).head(5))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Setting up a Live Database for this Demo\n", "To prove LakeLogic natively executes SQL over the wire, we will quickly create a local SQLite database and populate it with tables. (LakeLogic uses exactly the same engine logic for Postgres, MySQL, SQL Server, etc)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "abs_db_path = os.path.abspath(\"demo.db\").replace(\"\\\\\", \"/\")\n", "import sqlite3\n", "from datetime import datetime\n", "\n", "conn = sqlite3.connect(\"demo.db\")\n", "c = conn.cursor()\n", "\n", "# Seed users table (For Section 3)\n", "c.execute(\"CREATE TABLE IF NOT EXISTS pg_users (id INTEGER, email TEXT, signup_date TEXT)\")\n", "c.execute(\"DELETE FROM pg_users\")\n", "c.executemany(\n", " \"INSERT INTO pg_users VALUES (?, ?, ?)\",\n", " [(1, \"test@example.com\", \"2024-01-01\"), (2, \"invalid_email.com\", \"2024-01-02\")],\n", ")\n", "\n", "# Seed cdc_orders table (For Section 5)\n", "c.execute(\"CREATE TABLE IF NOT EXISTS cdc_orders (id INTEGER, updated_at TEXT)\")\n", "c.execute(\"DELETE FROM cdc_orders\")\n", "c.executemany(\"INSERT INTO cdc_orders VALUES (?, ?)\", [(1, \"2024-04-10T12:00:00Z\"), (2, \"2024-04-12T12:00:00Z\")])\n", "\n", "# Seed massive_orders table (For Section 6)\n", "c.execute(\"CREATE TABLE IF NOT EXISTS massive_orders (id INTEGER, total REAL)\")\n", "c.execute(\"DELETE FROM massive_orders\")\n", "c.executemany(\"INSERT INTO massive_orders VALUES (?, ?)\", [(i, i * 1.5) for i in range(1, 1001)])\n", "\n", "# Seed wide_orders_table (For Section 7)\n", "c.execute(\"CREATE TABLE IF NOT EXISTS wide_orders_table (order_id TEXT, total_amount REAL, huge_json TEXT)\")\n", "c.execute(\"DELETE FROM wide_orders_table\")\n", "c.executemany(\n", " \"INSERT INTO wide_orders_table VALUES (?, ?, ?)\",\n", " [(\"A1\", 100.5, '{\"data\":\"blob\"}'), (\"A2\", 50.0, '{\"data\":\"blob\"}')],\n", ")\n", "\n", "conn.commit()\n", "conn.close()\n", "print(\"demo.db initialized with seed data!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 4. Native Database Ingestion (SQLite / Postgres)\n", "\n", "**The Problem:** You need to mirror a transactional database (e.g. `users` or `orders` tables) without maintaining brittle JDBC extraction layers or deploying heavy extraction pipelines.\n", "\n", "**The Solution:** Use Polars' native `read_database_uri()` to extract the data at blazing speed (via ConnectorX or ADBC), then pass the dataframe immediately into `proc.run()`. No scaffolding or temporary files required." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Define the schema boundary for Postgres ─────────────\n", "postgres_contract = s.write_contract(\n", " f\"\"\"\n", "version: 1.0.0\n", "dataset: pg_users\n", "info:\n", " title: bronze_pg_users\n", "\n", "source:\n", " type: database\n", " path: sqlite:///{abs_db_path}\n", "\n", "model:\n", " fields:\n", " - name: id\n", " type: integer\n", " required: true\n", " - name: email\n", " type: string\n", " required: true\n", " - name: signup_date\n", " type: string\n", "\n", "quality:\n", " row_rules:\n", " - name: valid_email\n", " sql: \"email LIKE '%@%'\"\n", "\"\"\",\n", " \"06_integrations_demo/postgres_users.yaml\",\n", ")\n", "\n", "print(\"Contract written for Postgres extraction\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Extract directly from DataProcessor ────\n", "# By calling run_source() with engine='polars', LakeLogic routes automatically to pl.read_database_uri.\n", "proc = ll.DataProcessor(\"06_integrations_demo/postgres_users.yaml\", engine=\"polars\")\n", "res = proc.run_source()\n", "\n", "print(f\"\\nExtracted {res.source_count} rows from the db.\")\n", "print(f\"Good rows: {res.good_count}\")\n", "print(f\"Quarantined rows: {res.bad_count}\")\n", "display(res.good)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 5. Incremental CDC (Change Data Capture)\n", "\n", "**The Problem:** Your database has millions of rows. Extracting the full table every minute crushes the database and your pipeline.\n", "\n", "**The Solution:** Set `load_mode: incremental` and a `watermark_field`. LakeLogic stores the max watermark safely in the `.lakelogic` state folder and dynamically injects `WHERE updated_at > last_watermark` into the SQL engine before data is even loaded." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Contract declaring Incremental CDC ─────────────\n", "cdc_contract = s.write_contract(\n", " f\"\"\"\n", "version: 1.0.0\n", "dataset: cdc_orders\n", "info:\n", " title: bronze_cdc_orders\n", " target_layer: bronze\n", "\n", "source:\n", " type: database\n", " load_mode: incremental\n", " watermark_field: updated_at\n", " path: sqlite:///{abs_db_path}\n", "\n", "model:\n", " fields:\n", " - name: id\n", " type: integer\n", " required: true\n", " - name: updated_at\n", " type: timestamp\n", " required: true\n", "\"\"\",\n", " \"06_integrations_demo/cdc_orders.yaml\",\n", ")\n", "\n", "print(\"Contract written with Incremental CDC watermark tracking!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Execute Incremental CDC Load ────\n", "proc = ll.DataProcessor(\"06_integrations_demo/cdc_orders.yaml\", engine=\"polars\")\n", "res = proc.run_source()\n", "\n", "print(f\"\\nTotal processed: {res.source_count}\")\n", "display(res.good.head(3))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 6. Massive Initial Loads / Batch Ingestion\n", "\n", "**The Problem:** You have a 100GB table in SQL Server or MySQL. Doing a `SELECT *` for the initial load will crash the worker node with an Out-of-Memory (OOM) error before it can evaluate any quality rules.\n", "\n", "**The Solution:** Add `options: {fetch_size: N}`. LakeLogic automatically alters the execution path to use a streaming SQL iterator (via SQLAlchemy). It pulls 500,000 rows at a time, validates them against your contract, drops quarantined rows, writes the valid rows dynamically to your `target_layer`, and continues. Memory usage stays flat." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Native Engine Dialect Agnosticism & Batching ─────\n", "# LakeLogic automatically detects dialect extensions via URI (MySQL, SQL Server, SQLite).\n", "batch_contract = s.write_contract(\n", " f\"\"\"\n", "version: 1.0.0\n", "dataset: massive_orders\n", "info:\n", " title: bronze_massive_orders\n", "\n", "source:\n", " type: database\n", " path: sqlite:///{abs_db_path}\n", " options:\n", " fetch_size: 500000 # <- Subverts memory limits by iterating chunks natively\n", "\n", "model:\n", " fields:\n", " - name: id\n", " type: integer\n", " required: true\n", " - name: total\n", " type: float\n", "\"\"\",\n", " \"06_integrations_demo/batch_orders.yaml\",\n", ")\n", "\n", "print(\"Contract written with fetch_size configured for batch streams!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Execute Batch Ingestion (Iterative) ────\n", "proc = ll.DataProcessor(\"06_integrations_demo/batch_orders.yaml\", engine=\"polars\")\n", "res = proc.run_source()\n", "\n", "print(f\"\\nTotal processed: {res.source_count}\")\n", "display(res.good.head(3))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 7. Smart Column Projection (Pushdown)\n", "\n", "**The Problem:** The source `orders` table has 150 columns (including heavy JSON blobs), but your analytics pipeline only needs 3 columns. Doing `SELECT *` wastes network bandwidth and memory.\n", "\n", "**The Solution:** You do absolutely nothing! LakeLogic's Native Database engines intelligently read your `model.fields` list and dynamically construct precise `SELECT \"col1\", \"col2\"` queries. It only extracts exactly what is defined in the contract." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Automatic Projection Pushdown ──────\n", "# This contract will automatically generate the query:\n", "# SELECT \"order_id\", \"total_amount\" FROM \"wide_orders_table\"\n", "projection_contract = s.write_contract(\n", " f\"\"\"\n", "version: 1.0.0\n", "dataset: wide_orders_table\n", "info:\n", " title: bronze_narrow_orders\n", "\n", "source:\n", " type: database\n", " path: sqlite:///{abs_db_path}\n", "\n", "model:\n", " fields:\n", " # Only these two fields are extracted over the wire!\n", " - name: order_id\n", " type: string\n", " - name: total_amount\n", " type: float\n", "\"\"\",\n", " \"06_integrations_demo/projection_orders.yaml\",\n", ")\n", "\n", "print(\"Contract written showcasing smart zero-effort projection pushdown!\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Execute Smart Projection Extraction ────\n", "proc = ll.DataProcessor(\"06_integrations_demo/projection_orders.yaml\", engine=\"polars\")\n", "res = proc.run_source()\n", "\n", "print(f\"\\nTotal processed: {res.source_count}\")\n", "display(res.good.head(3))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## 8. Cloud Data Sources (Azure, AWS, GCP)\n", "\n", "**The Problem:** Your data lives in cloud storage — Azure Data Lake (ADLS), Amazon S3, or Google Cloud Storage. You need to read, validate, and materialize it without writing boilerplate credential code.\n", "\n", "**The Solution:** LakeLogic natively resolves `abfss://`, `s3://`, and `gs://` URIs. Its built-in `CloudCredentialResolver` automatically detects credentials from environment variables, service principals, IAM roles, or `az login` — zero manual `storage_options` configuration." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── LakeLogic's built-in cloud credential resolver ──────────\n", "from lakelogic import CloudCredentialResolver\n", "\n", "resolver = CloudCredentialResolver()\n", "\n", "# Auto-detects from env vars, az login, managed identity, or IAM roles\n", "print(\"CloudCredentialResolver supports:\")\n", "print(\" • Azure ADLS/Blob — abfss://container@account.dfs.core.windows.net/\")\n", "print(\" • Amazon S3 — s3://bucket/prefix/\")\n", "print(\" • Google Cloud GCS — gs://bucket/prefix/\")\n", "print()\n", "print(\"Authentication priority (Azure):\")\n", "print(\" 1. Explicit token/key (AZURE_STORAGE_ACCOUNT_KEY, SAS_TOKEN)\")\n", "print(\" 2. Service Principal (AZURE_CLIENT_ID + SECRET + TENANT_ID)\")\n", "print(\" 3. Account key from env var\")\n", "print(\" 4. Azure AD (az login / managed identity / workload identity)\")\n", "print()\n", "print(\"Authentication priority (AWS):\")\n", "print(\" 1. Explicit credentials (AWS_ACCESS_KEY_ID + SECRET)\")\n", "print(\" 2. Environment variables\")\n", "print(\" 3. IAM role (boto3 default credential chain)\")\n", "print()\n", "print(\"Authentication priority (GCP):\")\n", "print(\" 1. GOOGLE_SERVICE_ACCOUNT\")\n", "print(\" 2. GOOGLE_APPLICATION_CREDENTIALS env var\")\n", "print(\" 3. Application Default Credentials (gcloud auth)\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Azure Data Lake Storage (ADLS Gen2) ─────────────────\n", "# Just set your source.path to an abfss:// URI and LakeLogic handles the rest.\n", "# Credentials are auto-resolved from `az login` or env vars.\n", "#\n", "# The `partition` block is the key optimization:\n", "# - format: maps to your landing directory structure (strftime tokens)\n", "# - lookback_days: only scans the last N days instead of the entire lake\n", "# - This turns a full glob scan into a precise directory lookup\n", "\n", "azure_contract_yaml = \"\"\"\n", "version: 1.0.0\n", "dataset: customer_events\n", "info:\n", " title: bronze_customer_events\n", " domain: marketing\n", " target_layer: bronze\n", "\n", "source:\n", " path: abfss://landing@mystorageaccount.dfs.core.windows.net/events\n", " format: parquet\n", " load_mode: incremental\n", " watermark_strategy: pipeline_log\n", "\n", " # Partition-aware ingestion — only scan relevant date directories\n", " partition:\n", " format: \"y_%Y/m_%m/d_%d\" # matches: events/y_2026/m_04/d_16/*.parquet\n", " lookback_days: 3 # only scan last 3 days (not the entire lake)\n", " # start_date: \"2026-01-01\" # optional: override for backfills\n", " # end_date: \"2026-01-31\" # optional: override for backfills\n", "\n", "model:\n", " fields:\n", " - name: event_id\n", " type: string\n", " required: true\n", " - name: customer_id\n", " type: string\n", " required: true\n", " - name: event_type\n", " type: string\n", " - name: timestamp\n", " type: timestamp\n", " required: true\n", "\n", "quality:\n", " row_rules:\n", " - name: valid_event\n", " sql: \"event_type IN ('click', 'view', 'purchase', 'signup')\"\n", "\n", "materialization:\n", " strategy: append\n", " format: delta\n", " partition_by: [event_type] # Delta table partitioned for fast downstream queries\n", " target_path: abfss://silver@mystorageaccount.dfs.core.windows.net/events\n", "\n", "server:\n", " type: local\n", " path: \".\"\n", "\"\"\"\n", "\n", "print(\"Azure ADLS contract with partition-aware ingestion:\")\n", "print()\n", "print(\" Landing structure:\")\n", "print(\" abfss://landing@myaccount.dfs.core.windows.net/events/\")\n", "print(\" y_2026/\")\n", "print(\" m_04/\")\n", "print(\" d_14/*.parquet\")\n", "print(\" d_15/*.parquet\")\n", "print(\" d_16/*.parquet <-- only these 3 days scanned (lookback_days: 3)\")\n", "print()\n", "print(\" Without partition: scans ALL directories (slow, expensive)\")\n", "print(\" With partition: scans only y_2026/m_04/d_14, d_15, d_16 (fast, cheap)\")\n", "print()\n", "print(\" Output: Delta table at abfss://silver@.../events, partitioned by event_type\")\n", "print()\n", "print(\"# To execute:\")\n", "print('# proc = ll.DataProcessor(contract, engine=\"polars\")')\n", "print(\"# good, bad = proc.run_source() # Reads only last 3 days from ADLS\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Amazon S3 ───────────────────────────────────\n", "# Same pattern — just swap the URI to s3://\n", "\n", "s3_contract_yaml = \"\"\"\n", "version: 1.0.0\n", "dataset: order_events\n", "info:\n", " title: bronze_order_events\n", " domain: commerce\n", " target_layer: bronze\n", "\n", "source:\n", " path: s3://my-data-lake/landing/orders/*.json\n", " format: json\n", " load_mode: incremental\n", " watermark_strategy: source_mtime\n", "\n", "model:\n", " fields:\n", " - name: order_id\n", " type: string\n", " required: true\n", " - name: customer_id\n", " type: string\n", " required: true\n", " - name: total_amount\n", " type: float\n", " - name: currency\n", " type: string\n", " - name: created_at\n", " type: timestamp\n", " required: true\n", "\n", "quality:\n", " row_rules:\n", " - name: positive_amount\n", " sql: \"total_amount > 0\"\n", " - name: valid_currency\n", " sql: \"currency IN ('USD', 'EUR', 'GBP', 'JPY')\"\n", "\n", "server:\n", " type: local\n", " path: \".\"\n", "\"\"\"\n", "\n", "print(\"AWS S3 contract (would run with real credentials):\")\n", "print(\" source: s3://my-data-lake/landing/orders/*.json\")\n", "print()\n", "print(\"# To execute:\")\n", "print('# contract = s.write_contract(s3_contract_yaml, \"commerce/orders.yaml\")')\n", "print('# proc = ll.DataProcessor(contract, engine=\"polars\")')\n", "print(\"# good, bad = proc.run_source() # Reads from S3 automatically\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Google Cloud Storage (GCS) ─────────────────────\n", "# Same pattern — just swap to gs://\n", "\n", "gcs_contract_yaml = \"\"\"\n", "version: 1.0.0\n", "dataset: user_sessions\n", "info:\n", " title: bronze_user_sessions\n", " domain: analytics\n", " target_layer: bronze\n", "\n", "source:\n", " path: gs://my-analytics-bucket/sessions/*.parquet\n", " format: parquet\n", " load_mode: full\n", "\n", "model:\n", " fields:\n", " - name: session_id\n", " type: string\n", " required: true\n", " - name: user_id\n", " type: string\n", " required: true\n", " - name: page_views\n", " type: integer\n", " - name: duration_seconds\n", " type: integer\n", " - name: started_at\n", " type: timestamp\n", "\n", "quality:\n", " row_rules:\n", " - name: positive_views\n", " sql: \"page_views >= 0\"\n", " - name: reasonable_duration\n", " sql: \"duration_seconds BETWEEN 0 AND 86400\"\n", "\n", "server:\n", " type: local\n", " path: \".\"\n", "\"\"\"\n", "\n", "print(\"GCP Cloud Storage contract (would run with real credentials):\")\n", "print(\" source: gs://my-analytics-bucket/sessions/*.parquet\")\n", "print()\n", "print(\"# To execute:\")\n", "print('# contract = s.write_contract(gcs_contract_yaml, \"analytics/sessions.yaml\")')\n", "print('# proc = ll.DataProcessor(contract, engine=\"polars\")')\n", "print(\"# good, bad = proc.run_source() # Reads from GCS automatically\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ── Databricks Secret Scope Integration ─────────────────\n", "# On Databricks, LakeLogic can pull credentials from secret scopes\n", "# (backed by Azure Key Vault, AWS Secrets Manager, etc.)\n", "from lakelogic.engines.cloud_credentials import DatabricksSecretResolver\n", "\n", "print(\"DatabricksSecretResolver usage (Databricks notebooks only):\")\n", "print()\n", "print(\"# Azure (Key Vault-backed scope):\")\n", "print('# resolver = DatabricksSecretResolver.for_cloud(\"azure\", scope=\"lakelogic\")')\n", "print(\"# options = resolver.resolve_storage_options(\")\n", "print('# \"abfss://silver@myaccount.dfs.core.windows.net/orders/\"')\n", "print(\"# )\")\n", "print()\n", "print(\"# AWS (Secrets Manager-backed scope):\")\n", "print('# resolver = DatabricksSecretResolver.for_cloud(\"aws\", scope=\"lakelogic-aws\")')\n", "print('# options = resolver.resolve_storage_options(\"s3://my-bucket/silver/\")')\n", "print()\n", "print(\"# GCP (Secret Manager-backed scope):\")\n", "print('# resolver = DatabricksSecretResolver.for_cloud(\"gcp\", scope=\"lakelogic-gcp\")')\n", "print('# options = resolver.resolve_storage_options(\"gs://my-bucket/silver/\")')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What You Just Saw\n", "\n", "- **dbt adapter** — import `schema.yml` as a LakeLogic contract, zero duplication\n", "- **dlt adapter** — declare the API in the contract, `run_source()` does extraction + validation\n", "- **Native streaming connectors** — `WebSocketConnector` fetches live BTC trades, pre-validation `rename` transformations map cryptic field names to business-friendly columns\n", "- **Native database (Polars)** — `pl.read_database_uri()` for high-speed SQL extraction with contract validation\n", "- **Incremental CDC** — watermark-based change tracking with automatic state management\n", "- **Batch ingestion** — `fetch_size` for memory-safe chunked processing of massive tables\n", "- **Smart projection pushdown** — automatic `SELECT \"col\"` extraction based purely on the contract schema\n", "- **Cloud data sources** — native `abfss://`, `s3://`, `gs://` URI support with automatic credential resolution (Azure AD, IAM roles, service principals, Key Vault)\n", "- **Same reconciliation guarantee** — every row accounted for regardless of source" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Go Deeper — Explore by Capability\n", "\n", "Each notebook below maps to a pillar of LakeLogic's [Technical Capabilities](https://lakelogic.github.io/LakeLogic/#technical-capabilities):\n", "\n", "| # | Notebook | What You'll See |\n", "|---|---|---|\n", "| 🛡️ | **[Data Quality & Trust](01_data_quality_trust.ipynb)** | Reconciliation proofs, Pydantic validation, SQL-first rules, SLO monitoring |\n", "| 📜 | **[Compliance & Governance](02_compliance_governance.ipynb)** | GDPR erasure in 2 lines, automatic lineage, cost intelligence |\n", "| ⚡ | **[Engine & Scale](03_engine_scale.ipynb)** | Same contract on Polars & DuckDB, incremental processing, dry run |\n", "| 🔧 | **[Developer Experience](04_developer_experience.ipynb)** | Structured diagnostics, DDL generation, surgical resets, multi-channel alerts |\n", "| 🧬 | **[Data Generation & AI](05_data_generation_ai.ipynb)** | Synthetic data, referential integrity, edge case injection, contract inference |\n", "| 🔌 | **[Integrations](06_integrations.ipynb)** | dbt adapter, dlt sources, contract-driven quality gates on arrival |\n", "\n", "> **Each notebook is self-contained** — pick the capability that matters most to you and run it independently." ] } ], "metadata": { "colab": { "name": "LakeLogic — Integrations", "provenance": [] }, "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.13" } }, "nbformat": 4, "nbformat_minor": 0 }