{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Data Generation & AI\n",
"\n",
"[](https://colab.research.google.com/github/LakeLogic/LakeLogic/blob/main/examples/colab/05_data_generation_ai.ipynb) [](https://github.com/lakelogic/LakeLogic/blob/main/examples/colab/05_data_generation_ai.ipynb)\n",
"\n",
"Generate synthetic test data with referential integrity across tables, inject edge cases, validate quarantine coverage, and auto-infer contracts from CSVs."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import subprocess\n",
"import sys\n",
"import importlib\n",
"import urllib.request\n",
"import os\n",
"\n",
"if importlib.util.find_spec(\"lakelogic\") is None:\n",
" subprocess.check_call(\n",
" [sys.executable, \"-m\", \"pip\", \"install\", \"--upgrade\", \"-q\", \"lakelogic[polars,extraction-ocr,nlp]\"]\n",
" )\n",
"if not os.path.exists(\"_setup.py\"):\n",
" urllib.request.urlretrieve(\n",
" \"https://raw.githubusercontent.com/LakeLogic/LakeLogic/main/examples/colab/_setup.py\", \"_setup.py\"\n",
" )\n",
"import _setup as s\n",
"import lakelogic as ll"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## 1. DataGenerator Basics — Synthetic Data From a Contract\n",
"\n",
"**The Problem:** You need test data that matches your schema. Writing Faker scripts for every table is tedious and drifts out of sync with your contracts.\n",
"\n",
"**The Solution:** `DataGenerator` reads your contract and generates realistic data — including controlled invalid rows for quarantine testing."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"contract = s.write_contract(\n",
" \"\"\"\n",
"version: 1.0.0\n",
"dataset: test_users\n",
"model:\n",
" fields:\n",
" - name: user_id\n",
" type: integer\n",
" required: true\n",
" - name: email\n",
" type: string\n",
" required: true\n",
" pii: true\n",
" - name: age\n",
" type: integer\n",
" min: 18\n",
" max: 120\n",
" - name: country\n",
" type: string\n",
" accepted_values: [US, GB, DE, FR, JP]\n",
" - name: status\n",
" type: string\n",
" accepted_values: [active, inactive, suspended]\n",
"quality:\n",
" row_rules:\n",
" - name: valid_email\n",
" sql: \"email LIKE '%@%.%'\"\n",
" - name: valid_age\n",
" sql: \"age BETWEEN 18 AND 120\"\n",
" - name: valid_country\n",
" sql: \"country IN ('US','GB','DE','FR','JP')\"\n",
"\"\"\",\n",
" \"05_data_generation_ai_demo/users.yaml\",\n",
")\n",
"\n",
"gen = ll.DataGenerator(contract)\n",
"df = gen.generate(rows=1000, invalid_ratio=0.10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The Proof\n",
"print(f\"Generated {len(df)} rows with ~10% intentionally invalid\")\n",
"display(df.head(10))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## 2. Custom AI-Steered Scenarios\n",
"\n",
"**The Problem:** Heuristic or pure random data often fails to test very specific business logic states or regional subsets.\n",
"\n",
"**The Solution:** Use `ai=True` alongside `ai_custom_scenario` to specifically guide the AI generator while keeping strict adherence to your schema boundaries."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import lakelogic as ll\n",
"\n",
"# ── 1. Configure the AI Provider ──────────────────────────────────────\n",
"# LakeLogic supports: openai, anthropic, azure, Google Gemini, ollama, and anything\n",
"# LiteLLM supports. Set your provider, model, and API key here.\n",
"#\n",
"# Uncomment the provider you want to use:\n",
"\n",
"# -- OpenAI --\n",
"os.environ[\"LAKELOGIC_AI_PROVIDER\"] = \"openai\"\n",
"os.environ[\"LAKELOGIC_AI_MODEL\"] = \"gpt-4o-mini\"\n",
"# os.environ[\"OPENAI_API_KEY\"] = \"sk-...\"\n",
"\n",
"# -- Anthropic --\n",
"# os.environ[\"LAKELOGIC_AI_PROVIDER\"] = \"anthropic\"\n",
"# os.environ[\"LAKELOGIC_AI_MODEL\"] = \"claude-sonnet-4-20250514\"\n",
"# os.environ[\"ANTHROPIC_API_KEY\"] = \"sk-ant-...\"\n",
"\n",
"# -- Google Gemini --\n",
"# os.environ[\"LAKELOGIC_AI_PROVIDER\"] = \"google\"\n",
"# os.environ[\"LAKELOGIC_AI_MODEL\"] = \"gemini-2.5-flash\"\n",
"# os.environ[\"GOOGLE_API_KEY\"] = \"AIz.....\"\n",
"\n",
"# -- Ollama (local, free) --\n",
"# os.environ[\"LAKELOGIC_AI_PROVIDER\"] = \"ollama\"\n",
"# os.environ[\"LAKELOGIC_AI_MODEL\"] = \"llama3\"\n",
"\n",
"AI_PROVIDER = os.getenv(\"LAKELOGIC_AI_PROVIDER\", \"google\")\n",
"AI_MODEL = os.getenv(\"LAKELOGIC_AI_MODEL\", \"gemini-2.5-flash\")\n",
"AI_API_KEY = os.getenv(\"GOOGLE_API_KEY\", \"\") # or ANTHROPIC_API_KEY etc.\n",
"\n",
"# ── 2. Define the data contract ──────────────────────────────────────\n",
"scenario_contract = s.write_contract(\n",
" \"\"\"\n",
"version: 1.0.0\n",
"dataset: ecommerce_users\n",
"model:\n",
" fields:\n",
" - name: user_id\n",
" type: integer\n",
" required: true\n",
" - name: email\n",
" type: string\n",
" required: true\n",
" pii: true\n",
" - name: full_name\n",
" type: string\n",
" required: true\n",
" - name: age\n",
" type: integer\n",
" min: 18\n",
" max: 120\n",
" - name: country\n",
" type: string\n",
" accepted_values: [US, GB, DE, FR, JP, BR, IN]\n",
" - name: tier\n",
" type: string\n",
" accepted_values: [free, pro, enterprise]\n",
"quality:\n",
" row_rules:\n",
" - name: valid_email\n",
" sql: \"email LIKE '%@%.%'\"\n",
" - name: valid_age\n",
" sql: \"age BETWEEN 18 AND 120\"\n",
"\"\"\",\n",
" \"05_data_generation_ai_demo/scenario_users.yaml\",\n",
")\n",
"\n",
"# ── 3. Generate with a custom scenario ───────────────────────────────\n",
"# The scenario string is injected directly into the LLM prompt.\n",
"# The AI will generate realistic sample pools AND edge cases that\n",
"# respect your natural-language instructions.\n",
"\n",
"gen = ll.DataGenerator(scenario_contract)\n",
"\n",
"scenario_df = gen.generate(\n",
" rows=20,\n",
" invalid_ratio=0.20,\n",
" ai=True,\n",
" ai_provider=AI_PROVIDER,\n",
" ai_model=AI_MODEL,\n",
" ai_api_key=AI_API_KEY,\n",
" ai_custom_scenario=(\n",
" \"Generate users who are French (FR) or Japanese (JP) only. \"\n",
" \"All valid users should be enterprise-tier and over 60 years old. \"\n",
" \"Use realistic French and Japanese names for the full_name field. \"\n",
" \"For invalid edge cases, inject SQL injection strings into email \"\n",
" \"and negative ages.\"\n",
" ),\n",
")\n",
"\n",
"print(f\"Generated {len(scenario_df)} rows with custom AI scenario\")\n",
"display(scenario_df.head(10))\n",
"\n",
"# ── 4. Validate through the pipeline ─────────────────────────────────\n",
"proc = ll.DataProcessor(scenario_contract, engine=\"polars\")\n",
"good, bad = proc.run(scenario_df)\n",
"\n",
"print(f\"\\nGood: {len(good)} | Quarantined: {len(bad)}\")\n",
"s.assert_reconciliation(scenario_df, good, bad)\n",
"if len(bad) > 0:\n",
" print(\"\\nQuarantined rows (AI-generated edge cases):\")\n",
" display(bad.head(5))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## 3. Streaming Simulation -- Time-Windowed Batch Generation\n",
"\n",
"**The Problem:** Your pipeline must handle incremental data arriving in time windows.\n",
"You need test data that simulates realistic ingestion patterns -- not just static dumps.\n",
"\n",
"**The Solution:** `DataGenerator.generate_stream()` produces batches with monotonically\n",
"increasing timestamps, each confined to a configurable time window.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# -- Streaming Simulation: time-windowed batch generation ----------\n",
"import lakelogic as ll\n",
"import polars as pl\n",
"\n",
"stream_contract = s.write_contract(\n",
" \"\"\"\n",
"version: 1.0.0\n",
"dataset: streaming_events\n",
"\n",
"model:\n",
" fields:\n",
" - name: event_id\n",
" type: integer\n",
" - name: event_ts\n",
" type: timestamp\n",
" - name: user_id\n",
" type: integer\n",
" - name: action\n",
" type: string\n",
" accepted_values: [page_view, click, purchase, signup]\n",
"\"\"\",\n",
" \"05_data_generation_ai_demo/streaming_events.yaml\",\n",
")\n",
"\n",
"gen = ll.DataGenerator(stream_contract)\n",
"\n",
"# Simulate 3 batches arriving every 15 minutes, 5 rows each\n",
"all_batches = []\n",
"for window_start, window_end, batch_df in gen.generate_stream(batches=3, interval_minutes=15, rows_per_batch=5):\n",
" print(f\"Window: {window_start} -> {window_end} | {len(batch_df)} rows\")\n",
" all_batches.append(batch_df)\n",
"\n",
"df_stream = pl.concat(all_batches)\n",
"print(f\"\\nTotal: {len(df_stream)} events across 3 windows\")\n",
"display(df_stream.select([\"event_id\", \"event_ts\", \"action\"]).head(10))\n",
"print(\"\\n\\u2705 Streaming simulation. Faker. $0 cost.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## 4. Referential Integrity — FK/PK Consistency Across Tables\n",
"\n",
"**The Problem:** You generate test customers and test orders separately. Half your order rows reference `customer_id` values that don't exist in the customers table. Your join tests fail for the wrong reasons.\n",
"\n",
"**The Solution:** `DataGenerator.generate_related()` detects FK/PK relationships between contracts, generates parent tables first, then passes parent PKs into child tables so every foreign key is valid."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define two related contracts: customers (parent) and orders (child)\n",
"customers_path = s.write_contract(\n",
" \"\"\"\n",
"version: 1.0.0\n",
"dataset: customers\n",
"model:\n",
" fields:\n",
" - name: customer_id\n",
" type: integer\n",
" required: true\n",
" - name: name\n",
" type: string\n",
" required: true\n",
" - name: email\n",
" type: string\n",
" required: true\n",
" pii: true\n",
" - name: tier\n",
" type: string\n",
" accepted_values: [free, pro, enterprise]\n",
"quality:\n",
" row_rules:\n",
" - name: valid_email\n",
" sql: \"email LIKE '%@%.%'\"\n",
" dataset_rules:\n",
" - unique: customer_id\n",
"\"\"\",\n",
" \"05_data_generation_ai_demo/ri_customers.yaml\",\n",
")\n",
"\n",
"orders_path = s.write_contract(\n",
" \"\"\"\n",
"version: 1.0.0\n",
"dataset: orders\n",
"model:\n",
" fields:\n",
" - name: order_id\n",
" type: integer\n",
" required: true\n",
" - name: customer_id\n",
" type: integer\n",
" required: true\n",
" foreign_key:\n",
" contract: customers\n",
" column: customer_id\n",
" - name: amount\n",
" type: float\n",
" required: true\n",
" - name: status\n",
" type: string\n",
" accepted_values: [pending, shipped, delivered]\n",
"quality:\n",
" row_rules:\n",
" - name: positive_amount\n",
" sql: \"amount > 0\"\n",
" - referential_integrity:\n",
" field: customer_id\n",
" contract: customers\n",
" column: customer_id\n",
" severity: critical\n",
" dataset_rules:\n",
" - unique: order_id\n",
"\"\"\",\n",
" \"05_data_generation_ai_demo/ri_orders.yaml\",\n",
")\n",
"\n",
"# Generate both tables with referential integrity\n",
"related = ll.DataGenerator.generate_related(\n",
" contracts={\n",
" \"customers\": customers_path,\n",
" \"orders\": orders_path,\n",
" },\n",
" rows={\"customers\": 50, \"orders\": 200},\n",
" invalid_ratio=0.05,\n",
")\n",
"\n",
"customers_df = related[\"customers\"]\n",
"orders_df = related[\"orders\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The Proof — every order.customer_id exists in customers.customer_id\n",
"import polars as pl\n",
"\n",
"parent_ids = set(customers_df[\"customer_id\"].to_list())\n",
"child_ids = set(orders_df[\"customer_id\"].to_list())\n",
"orphans = child_ids - parent_ids\n",
"\n",
"print(f\"Customers: {len(customers_df)} rows, {len(parent_ids)} unique IDs\")\n",
"print(f\"Orders: {len(orders_df)} rows\")\n",
"print(f\"Orphan FKs: {len(orphans)}\")\n",
"print()\n",
"\n",
"# Show the FK distribution\n",
"fk_counts = orders_df.group_by(\"customer_id\").len().sort(\"len\", descending=True)\n",
"print(\"Orders per customer (top 5):\")\n",
"display(fk_counts.head(5))\n",
"print(\"\\n50 customers, 200 orders — every FK references a real parent row.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Validate both tables through their contracts\n",
"proc_c = ll.DataProcessor(customers_path, engine=\"polars\")\n",
"good_c, bad_c = proc_c.run(customers_df)\n",
"\n",
"proc_o = ll.DataProcessor(orders_path, engine=\"polars\")\n",
"good_o, bad_o = proc_o.run(orders_df)\n",
"\n",
"print(\"Customers:\")\n",
"s.assert_reconciliation(customers_df, good_c, bad_c)\n",
"print(\"\\nOrders:\")\n",
"s.assert_reconciliation(orders_df, good_o, bad_o)\n",
"print(\"\\nBoth tables validated. Referential integrity preserved end-to-end.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## 5. `infer_contract` from Schema — No Data Needed\n",
"\n",
"**The Problem:** You know the table schema (from Spark, a DDL script, or a spec doc) but don't have sample data yet. Writing the contract YAML by hand is tedious and error-prone.\n",
"\n",
"**The Solution:** Pass a **DDL string**, **Spark StructType**, **list of tuples**, or **dict** directly to `infer_contract` — it generates the full contract with correct types and PII detection from column names alone."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from lakelogic.core.bootstrap import infer_contract\n",
"\n",
"# ── From a Spark DDL string (the most common format) ─────────────\n",
"draft = infer_contract(\n",
" \"order_id BIGINT, customer_email STRING, amount DECIMAL(10,2), status STRING, created_at TIMESTAMP\",\n",
" title=\"Orders\",\n",
" domain=\"commerce\",\n",
")\n",
"draft.show()\n",
"print()\n",
"print(\"PII auto-detected: customer_email flagged as email\")\n",
"print(\"Types mapped: BIGINT -> integer, DECIMAL -> double, TIMESTAMP -> timestamp\")"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"version: 1.0.0\n",
"\n",
"info:\n",
" title: Patient Records\n",
" version: 1.0.0\n",
" description: 'Auto-inferred contract. Source: DataFrame.'\n",
" target_layer: bronze\n",
" generated_at: '2026-04-16T22:07:59Z'\n",
" domain: healthcare\n",
"\n",
"model:\n",
" fields:\n",
"\n",
" - name: patient_id\n",
" type: string\n",
"\n",
" - name: ssn\n",
" type: string\n",
" pii: true\n",
" classification: ssn\n",
"\n",
" - name: diagnosis_code\n",
" type: string\n",
"\n",
" - name: admission_date\n",
" type: date\n",
"\n",
" - name: discharge_date\n",
" type: date\n",
"\n",
" - name: total_cost\n",
" type: double\n",
"\n",
"\n",
"PII auto-detected: ssn flagged as SSN\n",
"Temporal pair: admission_date <= discharge_date would be suggested with data\n"
]
}
],
"source": [
"# ── From a list of (name, type) tuples ────────────────────────\n",
"# Great for programmatic contract creation\n",
"draft_tuples = infer_contract(\n",
" [\n",
" (\"patient_id\", \"string\"),\n",
" (\"ssn\", \"string\"),\n",
" (\"diagnosis_code\", \"string\"),\n",
" (\"admission_date\", \"date\"),\n",
" (\"discharge_date\", \"date\"),\n",
" (\"total_cost\", \"double\"),\n",
" ],\n",
" title=\"Patient Records\",\n",
" domain=\"healthcare\",\n",
")\n",
"draft_tuples.show()\n",
"print()\n",
"print(\"PII auto-detected: ssn flagged as SSN\")\n",
"print(\"Temporal pair: admission_date <= discharge_date would be suggested with data\")"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[32m2026-04-16 23:08:12.236\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3144\u001b[0m - \u001b[1m📋 Generating data for: _from_schema\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.237\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3145\u001b[0m - \u001b[1m Records : 9 valid + 1 invalid = 10 total\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.237\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3161\u001b[0m - \u001b[1m Source : Faker + heuristic generation (no AI or file seeds)\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.237\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3176\u001b[0m - \u001b[1m Edge cases : Heuristic-only (no AI edge cases available)\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.241\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3210\u001b[0m - \u001b[1m Row generation complete: 10 records built\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.241\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3232\u001b[0m - \u001b[1m Test cases : 4 across 3 categories\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.241\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3234\u001b[0m - \u001b[1m RANGE_VIOLATION 2 injections\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.243\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3234\u001b[0m - \u001b[1m EMPTY_STRING 1 injections\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.243\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3234\u001b[0m - \u001b[1m NOT_NULL_VIOLATION 1 injections\u001b[0m\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"DDL string → DataGenerator → 10 rows (with 10% intentionally invalid):\n"
]
},
{
"data": {
"text/html": [
"
\n",
"
shape: (10, 7)| order_id | customer_email | amount | status | created_at | _is_invalid | _test_case_types |
|---|
| i64 | str | f64 | str | str | bool | str |
| 4366 | "garciamichelle@example.net" | null | "pending" | "2026-03-01T21:40:17.241098" | false | null |
| -413 | "randolphmarc@example.com" | -546.519057 | "" | null | true | "EMPTY_STRING,NOT_NULL_VIOLATION,RANGE_VIOLATION" |
| 4289 | "qscott@example.org" | 14.86 | "active" | "2026-04-11T19:53:34.240558" | false | null |
| 9692 | "jalexander@example.net" | 15.63 | "active" | "2026-02-01T01:40:41.241098" | false | null |
| 5328 | "jasondaniels@example.com" | 422.54 | "active" | "2026-02-11T16:39:47.240558" | false | null |
| 8347 | "davisbrittany@example.net" | 28.48 | "active" | "2026-04-15T21:09:16.241098" | false | null |
| 4703 | "jason02@example.net" | 36.26 | "active" | "2026-03-07T14:37:49.240558" | false | null |
| 4213 | "baileyjames@example.com" | 158.85 | "pending" | "2026-01-22T05:05:08.240558" | false | null |
| 7635 | "cristinabeard@example.com" | null | "active" | "2026-02-01T20:21:40.237826" | false | null |
| 758 | "baileyjade@example.com" | 146.52 | "active" | "2026-03-13T16:48:52.241098" | false | null |
"
],
"text/plain": [
"shape: (10, 7)\n",
"┌──────────┬────────────────┬─────────────┬─────────┬────────────────┬─────────────┬───────────────┐\n",
"│ order_id ┆ customer_email ┆ amount ┆ status ┆ created_at ┆ _is_invalid ┆ _test_case_ty │\n",
"│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ pes │\n",
"│ i64 ┆ str ┆ f64 ┆ str ┆ str ┆ bool ┆ --- │\n",
"│ ┆ ┆ ┆ ┆ ┆ ┆ str │\n",
"╞══════════╪════════════════╪═════════════╪═════════╪════════════════╪═════════════╪═══════════════╡\n",
"│ 4366 ┆ garciamichelle ┆ null ┆ pending ┆ 2026-03-01T21: ┆ false ┆ null │\n",
"│ ┆ @example.net ┆ ┆ ┆ 40:17.241098 ┆ ┆ │\n",
"│ -413 ┆ randolphmarc@e ┆ -546.519057 ┆ ┆ null ┆ true ┆ EMPTY_STRING, │\n",
"│ ┆ xample.com ┆ ┆ ┆ ┆ ┆ NOT_NULL_VIOL │\n",
"│ ┆ ┆ ┆ ┆ ┆ ┆ ATION,RANGE_V │\n",
"│ ┆ ┆ ┆ ┆ ┆ ┆ IOLATION │\n",
"│ 4289 ┆ qscott@example ┆ 14.86 ┆ active ┆ 2026-04-11T19: ┆ false ┆ null │\n",
"│ ┆ .org ┆ ┆ ┆ 53:34.240558 ┆ ┆ │\n",
"│ 9692 ┆ jalexander@exa ┆ 15.63 ┆ active ┆ 2026-02-01T01: ┆ false ┆ null │\n",
"│ ┆ mple.net ┆ ┆ ┆ 40:41.241098 ┆ ┆ │\n",
"│ 5328 ┆ jasondaniels@e ┆ 422.54 ┆ active ┆ 2026-02-11T16: ┆ false ┆ null │\n",
"│ ┆ xample.com ┆ ┆ ┆ 39:47.240558 ┆ ┆ │\n",
"│ 8347 ┆ davisbrittany@ ┆ 28.48 ┆ active ┆ 2026-04-15T21: ┆ false ┆ null │\n",
"│ ┆ example.net ┆ ┆ ┆ 09:16.241098 ┆ ┆ │\n",
"│ 4703 ┆ jason02@exampl ┆ 36.26 ┆ active ┆ 2026-03-07T14: ┆ false ┆ null │\n",
"│ ┆ e.net ┆ ┆ ┆ 37:49.240558 ┆ ┆ │\n",
"│ 4213 ┆ baileyjames@ex ┆ 158.85 ┆ pending ┆ 2026-01-22T05: ┆ false ┆ null │\n",
"│ ┆ ample.com ┆ ┆ ┆ 05:08.240558 ┆ ┆ │\n",
"│ 7635 ┆ cristinabeard@ ┆ null ┆ active ┆ 2026-02-01T20: ┆ false ┆ null │\n",
"│ ┆ example.com ┆ ┆ ┆ 21:40.237826 ┆ ┆ │\n",
"│ 758 ┆ baileyjade@exa ┆ 146.52 ┆ active ┆ 2026-03-13T16: ┆ false ┆ null │\n",
"│ ┆ mple.com ┆ ┆ ┆ 48:52.241098 ┆ ┆ │\n",
"└──────────┴────────────────┴─────────────┴─────────┴────────────────┴─────────────┴───────────────┘"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[32m2026-04-16 23:08:12.251\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3144\u001b[0m - \u001b[1m📋 Generating data for: _from_schema\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.254\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3145\u001b[0m - \u001b[1m Records : 5 valid + 0 invalid = 5 total\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.254\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3161\u001b[0m - \u001b[1m Source : Faker + heuristic generation (no AI or file seeds)\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.254\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3210\u001b[0m - \u001b[1m Row generation complete: 5 records built\u001b[0m\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Tuple list → DataGenerator → 5 rows:\n"
]
},
{
"data": {
"text/html": [
"\n",
"
shape: (5, 6)| patient_id | ssn | diagnosis_code | admission_date | total_cost | _is_invalid |
|---|
| str | str | str | str | f64 | bool |
| "PAT-375551" | "320-80-7413" | "kxe01.4" | "2026-04-03" | 187.0766 | false |
| "PAT-368224" | "607-47-9921" | "vNs66.7" | "2026-02-05" | 703.6594 | false |
| "PAT-991090" | "012-59-1862" | "WMS15.5" | "2026-04-04" | 964.066 | false |
| "PAT-376199" | "125-70-6577" | "zBJ71.0" | "2026-02-04" | 666.723 | false |
| "PAT-897335" | "172-32-5296" | "jvu79.0" | "2026-03-19" | 269.1227 | false |
"
],
"text/plain": [
"shape: (5, 6)\n",
"┌────────────┬─────────────┬────────────────┬────────────────┬────────────┬─────────────┐\n",
"│ patient_id ┆ ssn ┆ diagnosis_code ┆ admission_date ┆ total_cost ┆ _is_invalid │\n",
"│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │\n",
"│ str ┆ str ┆ str ┆ str ┆ f64 ┆ bool │\n",
"╞════════════╪═════════════╪════════════════╪════════════════╪════════════╪═════════════╡\n",
"│ PAT-375551 ┆ 320-80-7413 ┆ kxe01.4 ┆ 2026-04-03 ┆ 187.0766 ┆ false │\n",
"│ PAT-368224 ┆ 607-47-9921 ┆ vNs66.7 ┆ 2026-02-05 ┆ 703.6594 ┆ false │\n",
"│ PAT-991090 ┆ 012-59-1862 ┆ WMS15.5 ┆ 2026-04-04 ┆ 964.066 ┆ false │\n",
"│ PAT-376199 ┆ 125-70-6577 ┆ zBJ71.0 ┆ 2026-02-04 ┆ 666.723 ┆ false │\n",
"│ PAT-897335 ┆ 172-32-5296 ┆ jvu79.0 ┆ 2026-03-19 ┆ 269.1227 ┆ false │\n",
"└────────────┴─────────────┴────────────────┴────────────────┴────────────┴─────────────┘"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[32m2026-04-16 23:08:12.260\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3144\u001b[0m - \u001b[1m📋 Generating data for: _from_schema\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.260\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3145\u001b[0m - \u001b[1m Records : 5 valid + 0 invalid = 5 total\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.260\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3161\u001b[0m - \u001b[1m Source : Faker + heuristic generation (no AI or file seeds)\u001b[0m\n",
"\u001b[32m2026-04-16 23:08:12.262\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mlakelogic.core.generator\u001b[0m:\u001b[36mgenerate\u001b[0m:\u001b[36m3210\u001b[0m - \u001b[1m Row generation complete: 5 records built\u001b[0m\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Dict → DataGenerator → 5 rows:\n"
]
},
{
"data": {
"text/html": [
"\n",
"
shape: (5, 5)| product_id | name | price | in_stock | _is_invalid |
|---|
| i64 | str | f64 | bool | bool |
| 4704 | "Leslie Patterson" | 64.34 | false | false |
| 3627 | "Hannah Gay" | 87.93 | true | false |
| 4925 | "Sarah Santos" | 24.21 | false | false |
| 4402 | "Darryl Gardner" | 20.11 | true | false |
| 8839 | "Austin Mccoy" | null | true | false |
"
],
"text/plain": [
"shape: (5, 5)\n",
"┌────────────┬──────────────────┬───────┬──────────┬─────────────┐\n",
"│ product_id ┆ name ┆ price ┆ in_stock ┆ _is_invalid │\n",
"│ --- ┆ --- ┆ --- ┆ --- ┆ --- │\n",
"│ i64 ┆ str ┆ f64 ┆ bool ┆ bool │\n",
"╞════════════╪══════════════════╪═══════╪══════════╪═════════════╡\n",
"│ 4704 ┆ Leslie Patterson ┆ 64.34 ┆ false ┆ false │\n",
"│ 3627 ┆ Hannah Gay ┆ 87.93 ┆ true ┆ false │\n",
"│ 4925 ┆ Sarah Santos ┆ 24.21 ┆ false ┆ false │\n",
"│ 4402 ┆ Darryl Gardner ┆ 20.11 ┆ true ┆ false │\n",
"│ 8839 ┆ Austin Mccoy ┆ null ┆ true ┆ false │\n",
"└────────────┴──────────────────┴───────┴──────────┴─────────────┘"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# ── DataGenerator from schema directly ──────────────────────\n",
"# No contract YAML needed. No CSV needed. Just the schema.\n",
"import lakelogic as ll\n",
"\n",
"# DDL string → synthetic data in 2 lines\n",
"gen = ll.DataGenerator(\"order_id BIGINT, customer_email STRING, amount DOUBLE, status STRING, created_at TIMESTAMP\")\n",
"df = gen.generate(rows=10, invalid_ratio=0.10)\n",
"print(\"DDL string → DataGenerator → 10 rows (with 10% intentionally invalid):\")\n",
"display(df)\n",
"\n",
"print()\n",
"\n",
"# List of tuples → synthetic data\n",
"gen2 = ll.DataGenerator(\n",
" [\n",
" (\"patient_id\", \"string\"),\n",
" (\"ssn\", \"string\"),\n",
" (\"diagnosis_code\", \"string\"),\n",
" (\"admission_date\", \"date\"),\n",
" (\"total_cost\", \"double\"),\n",
" ]\n",
")\n",
"df2 = gen2.generate(rows=5)\n",
"print(\"Tuple list → DataGenerator → 5 rows:\")\n",
"display(df2)\n",
"\n",
"print()\n",
"\n",
"# Dict → synthetic data\n",
"gen3 = ll.DataGenerator({\"product_id\": \"integer\", \"name\": \"string\", \"price\": \"decimal\", \"in_stock\": \"boolean\"})\n",
"df3 = gen3.generate(rows=5)\n",
"print(\"Dict → DataGenerator → 5 rows:\")\n",
"display(df3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## 6. `infer_contract` — Contract From a CSV in 30 Seconds\n",
"\n",
"**The Problem:** You have 50 CSVs and no contracts. Writing YAML by hand for each one takes days.\n",
"\n",
"**The Solution:** Point `infer_contract` at a file. It detects types, PII fields, and suggests quality rules."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from lakelogic.core.bootstrap import infer_contract\n",
"\n",
"# Create a sample CSV\n",
"sample = pl.DataFrame(\n",
" {\n",
" \"order_id\": list(range(1, 101)),\n",
" \"customer_email\": [f\"user{i}@example.com\" for i in range(1, 101)],\n",
" \"amount\": [round(i * 9.99, 2) for i in range(1, 101)],\n",
" \"country\": [\"US\", \"GB\", \"DE\", \"FR\", \"JP\"] * 20,\n",
" \"created_at\": [\"2026-01-15\"] * 100,\n",
" }\n",
")\n",
"sample.write_csv(\"sample_orders.csv\")\n",
"\n",
"# Infer a contract from the CSV\n",
"draft = infer_contract(\"sample_orders.csv\", title=\"Inferred Orders\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The Proof\n",
"draft.show()\n",
"print(\"\\nContract inferred in seconds. PII detected. Types resolved. Ready to customise.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## 7. Unstructured Processing — Contract-Driven Extraction\n",
"\n",
"**The Problem:** You have PDFs, scanned images, or free-text. Regex breaks on format changes. Custom parsers drift from your schema.\n",
"\n",
"**The Solution:** Declare *what* to extract in `model.fields` and *how* in `extraction:`. LakeLogic picks the right library, extracts, validates, and materialises — one call.\n",
"\n",
"| Provider | Extra | Input | Use Case |\n",
"|----------|-------|-------|----------|\n",
"| `local` (pdfplumber) | `lakelogic[extraction-ocr]` | PDF | Table + text, $0, no API key |\n",
"| `spacy` | `lakelogic[nlp]` | Free text | NER + classification, $0, local |\n",
"| `rapidocr` | `lakelogic[extraction-ocr]` | Scanned image | ONNX OCR, pure Python, no torch |\n",
"| `openai` / `anthropic` | `lakelogic[ai]` | Any | LLM prompting with structured output |"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Generate demo assets — invoice PDF and support tickets\n",
"import os\n",
"import shutil\n",
"import tempfile\n",
"import subprocess\n",
"import sys\n",
"import polars as pl\n",
"\n",
"DEMO_DIR = os.path.join(tempfile.gettempdir(), \"lakelogic_extraction_demo\")\n",
"shutil.rmtree(DEMO_DIR, ignore_errors=True)\n",
"os.makedirs(DEMO_DIR, exist_ok=True)\n",
"\n",
"try:\n",
" from fpdf import FPDF\n",
"except ImportError:\n",
" subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"-q\", \"fpdf2\"])\n",
" from fpdf import FPDF\n",
"\n",
"# Build a realistic invoice PDF with a table of line items\n",
"pdf = FPDF()\n",
"pdf.add_page()\n",
"pdf.set_font(\"Helvetica\", \"B\", 20)\n",
"pdf.cell(0, 15, \"INVOICE\", new_x=\"LMARGIN\", new_y=\"NEXT\", align=\"C\")\n",
"pdf.set_font(\"Helvetica\", \"\", 11)\n",
"pdf.cell(0, 8, \"Acme Corp | 500 Market St, San Francisco, CA 94105\", new_x=\"LMARGIN\", new_y=\"NEXT\", align=\"C\")\n",
"pdf.ln(4)\n",
"pdf.set_font(\"Helvetica\", \"B\", 11)\n",
"pdf.cell(95, 8, \"Invoice #: INV-2026-0042\")\n",
"pdf.cell(95, 8, \"Date: April 15, 2026\", new_x=\"LMARGIN\", new_y=\"NEXT\", align=\"R\")\n",
"pdf.cell(95, 8, \"Bill To: Globex Corporation\")\n",
"pdf.cell(95, 8, \"Due: May 15, 2026\", new_x=\"LMARGIN\", new_y=\"NEXT\", align=\"R\")\n",
"pdf.ln(4)\n",
"pdf.set_fill_color(240, 240, 240)\n",
"pdf.set_font(\"Helvetica\", \"B\", 10)\n",
"for col, w in [(\"Description\", 90), (\"Hours\", 30), (\"Rate\", 35), (\"Amount\", 35)]:\n",
" is_last = col == \"Amount\"\n",
" pdf.cell(w, 8, col, border=1, fill=True, align=\"C\", **(dict(new_x=\"LMARGIN\", new_y=\"NEXT\") if is_last else {}))\n",
"pdf.set_font(\"Helvetica\", \"\", 10)\n",
"LINE_ITEMS = [\n",
" (\"Data Platform Architecture\", 40, 200, 8000),\n",
" (\"Pipeline Development\", 60, 175, 10500),\n",
" (\"Quality Assurance & Testing\", 20, 150, 3000),\n",
"]\n",
"for desc, hrs, rate, amt in LINE_ITEMS:\n",
" pdf.cell(90, 7, desc, border=1)\n",
" pdf.cell(30, 7, str(hrs), border=1, align=\"C\")\n",
" pdf.cell(35, 7, f\"${rate:.2f}\", border=1, align=\"C\")\n",
" pdf.cell(35, 7, f\"${amt:,.2f}\", border=1, align=\"C\", new_x=\"LMARGIN\", new_y=\"NEXT\")\n",
"pdf.set_font(\"Helvetica\", \"B\", 12)\n",
"pdf.cell(155, 10, \"Total:\", align=\"R\")\n",
"pdf.cell(35, 10, \"$21,500.00\", align=\"C\", new_x=\"LMARGIN\", new_y=\"NEXT\")\n",
"\n",
"PDF_PATH = os.path.join(DEMO_DIR, \"demo_invoice.pdf\")\n",
"pdf.output(PDF_PATH)\n",
"\n",
"# Five support tickets for classification\n",
"tickets = pl.DataFrame(\n",
" {\n",
" \"ticket_id\": [1001, 1002, 1003, 1004, 1005],\n",
" \"ticket_body\": [\n",
" \"I was charged $2,500 for a subscription I cancelled. Billing error ongoing since March.\",\n",
" \"Order #4521 shipped to London but I live in Manchester. Please redirect via FedEx.\",\n",
" \"New MacBook Pro has a cracked screen. Returns team arranged replacement immediately.\",\n",
" \"Enterprise license for 500 seats expires next week. Discuss renewal and 200 more seats.\",\n",
" \"API returning 500 errors since 2pm. Blocking our entire production pipeline. Fix now.\",\n",
" ],\n",
" }\n",
")\n",
"\n",
"\n",
"print(f\"PDF invoice : {PDF_PATH}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# -- Flavour 1: PDF Invoice → pdfplumber --------------------------\n",
"# The contract defines model fields, extraction provider, and quality rules.\n",
"# extract_file() handles all parsing — no manual logic in the notebook.\n",
"\n",
"from lakelogic.engines.llm import extract_file\n",
"from lakelogic.core.models import ExtractionConfig\n",
"\n",
"pdf_contract = s.write_contract(\n",
" \"\"\"\n",
"version: 1.0.0\n",
"dataset: invoice_line_items\n",
"\n",
"model:\n",
" fields:\n",
" # -- Metadata (extracted from page text via regex) ----------\n",
" - name: invoice_number\n",
" type: string\n",
" required: true\n",
" extraction_task: metadata\n",
" extraction_examples: ['Invoice #:\\\\s*(\\\\S+)']\n",
" - name: vendor\n",
" type: string\n",
" extraction_task: metadata\n",
" extraction_examples: ['INVOICE\\\\n(.+?)\\\\n']\n",
" - name: date\n",
" type: string\n",
" extraction_task: metadata\n",
" extraction_examples: ['Date:\\\\s*(.+?)\\\\n']\n",
" - name: bill_to\n",
" type: string\n",
" extraction_task: metadata\n",
" extraction_examples: ['Bill To:\\\\s*(.+?)\\\\s+Due:']\n",
" - name: due_date\n",
" type: string\n",
" extraction_task: metadata\n",
" extraction_examples: ['Due:\\\\s*(.+?)(?:\\\\n|$)']\n",
"\n",
" # -- Table rows (matched by column header name) ------------\n",
" - name: description\n",
" type: string\n",
" required: true\n",
" - name: hours\n",
" type: integer\n",
" - name: rate\n",
" type: string\n",
" - name: amount\n",
" type: string\n",
"\n",
"extraction:\n",
" provider: pdfplumber\n",
"\n",
"quality:\n",
" row_rules:\n",
" - name: has_description\n",
" sql: \"description IS NOT NULL AND description != ''\"\n",
" - name: has_invoice_number\n",
" sql: \"invoice_number IS NOT NULL\"\n",
"\"\"\",\n",
" \"05_data_generation_ai_demo/invoice_line_items.yaml\",\n",
")\n",
"\n",
"# -- Extract -------------------------------------------------------\n",
"import yaml\n",
"\n",
"contract_dict = yaml.safe_load(open(pdf_contract))\n",
"ext_config = ExtractionConfig(\n",
" provider=contract_dict[\"extraction\"][\"provider\"],\n",
" output_schema=contract_dict[\"model\"][\"fields\"],\n",
")\n",
"rows = extract_file(PDF_PATH, ext_config)\n",
"\n",
"# -- Materialize --------------------------------------------------\n",
"import polars as pl\n",
"\n",
"df_invoice = pl.DataFrame(rows)\n",
"\n",
"\n",
"display(df_invoice)\n",
"print(f\"\\n\\u2705 PDF \\u2192 {len(rows)} line items + metadata. pdfplumber. $0 cost.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# ── Side-by-side: Raw PDF text vs Extracted Table ─────────────────\n",
"import pdfplumber\n",
"\n",
"with pdfplumber.open(PDF_PATH) as doc:\n",
" raw_text = doc.pages[0].extract_text()\n",
"\n",
"print(\"RAW PDF TEXT\".center(60, \"\\u2500\"))\n",
"print(raw_text)\n",
"print()\n",
"print(\"EXTRACTED TABLE\".center(60, \"\\u2500\"))\n",
"display(df_invoice.drop([c for c in df_invoice.columns if c.startswith(\"_\")]))\n",
"print(f\"\\n\\u2500 Source: {PDF_PATH}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# -- Flavour 2: Support Tickets -> spaCy NER + Classification ----\n",
"# spaCy extracts entities + classifies text locally at production speed.\n",
"\n",
"from lakelogic.engines.llm import extract_row\n",
"\n",
"nlp_contract = s.write_contract(\n",
" \"\"\"\n",
"version: 1.0.0\n",
"dataset: enriched_tickets\n",
"\n",
"model:\n",
" fields:\n",
" - name: persons\n",
" type: string\n",
" extraction_task: ner\n",
" - name: organizations\n",
" type: string\n",
" extraction_task: ner\n",
" extraction_examples: [ORG]\n",
" - name: category\n",
" type: string\n",
" extraction_task: classification\n",
" accepted_values: [billing, shipping, product, enterprise, outage]\n",
" - name: sentiment\n",
" type: string\n",
" extraction_task: sentiment\n",
"\n",
"extraction:\n",
" provider: spacy\n",
" model: en_core_web_md # medium model -- better NER than sm\n",
" text_column: ticket_body\n",
"\n",
"quality:\n",
" row_rules:\n",
" - name: has_sentiment\n",
" sql: \"sentiment IS NOT NULL\"\n",
"\"\"\",\n",
" \"05_data_generation_ai_demo/enriched_tickets.yaml\",\n",
")\n",
"\n",
"# -- Extract each ticket -----------------------------------------------\n",
"contract_dict = yaml.safe_load(open(nlp_contract))\n",
"ext_config = ExtractionConfig(\n",
" provider=contract_dict[\"extraction\"][\"provider\"],\n",
" text_column=contract_dict[\"extraction\"].get(\"text_column\", \"text\"),\n",
" output_schema=contract_dict[\"model\"][\"fields\"],\n",
")\n",
"\n",
"enriched = [extract_row(row, ext_config) for row in tickets.to_dicts()]\n",
"\n",
"# -- Materialize -------------------------------------------------------\n",
"df_tickets = pl.DataFrame(enriched)\n",
"\n",
"display_cols = [\"ticket_id\", \"persons\", \"organizations\", \"category\", \"sentiment\"]\n",
"\n",
"print(\"RAW ticket TEXT\".center(60, \"\\u2500\"))\n",
"print(tickets)\n",
"print()\n",
"print(\"EXTRACTED TABLE\".center(60, \"\\u2500\"))\n",
"\n",
"display(df_tickets.select([c for c in display_cols if c in df_tickets.columns]))\n",
"print(f\"\\n\\u2705 {len(enriched)} tickets enriched. spaCy. $0 cost.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## 8. Automated Run Logs — Structured Pipeline Observability\n",
"\n",
"**The Problem:** Pipelines fail silently. Row counts drift. Quarantine tables fill up. But you only find out when a dashboard is empty.\n",
"\n",
"**The Solution:** Every pipeline run automatically emits a structured, comprehensive run log. These logs can be written out to a Delta table, making your entire data operations history immediately queryable."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import lakelogic as ll\n",
"from lakelogic.core.run_log import write_run_log\n",
"import polars as pl\n",
"import duckdb\n",
"import os\n",
"import tempfile\n",
"\n",
"# ── 1. Configure the pipeline to write actual run logs to DuckDB ──────\n",
"LOG_DIR = os.path.join(tempfile.gettempdir(), \"lakelogic_logs\")\n",
"os.makedirs(LOG_DIR, exist_ok=True)\n",
"DB_PATH = os.path.join(LOG_DIR, \"run_logs.duckdb\").replace(\"\\\\\", \"/\")\n",
"\n",
"# Remove stale DB so we start fresh each demo run\n",
"if os.path.exists(DB_PATH):\n",
" os.remove(DB_PATH)\n",
"\n",
"log_contract_path = s.write_contract(\n",
" f\"\"\"\n",
"version: 1.0.0\n",
"dataset: automated_log_demo\n",
"metadata:\n",
" run_log_table: pipeline_run_logs\n",
" run_log_backend: duckdb\n",
" run_log_database: \"{DB_PATH}\"\n",
"model:\n",
" fields:\n",
" - name: user_id\n",
" type: integer\n",
" - name: age\n",
" type: integer\n",
"quality:\n",
" row_rules:\n",
" - name: valid_age\n",
" sql: \"age >= 18\"\n",
"\"\"\",\n",
" \"05_data_generation_ai_demo/automated_log_demo.yaml\",\n",
")\n",
"\n",
"# Load as a proper DataContract object (write_run_log needs .metadata)\n",
"log_contract = ll.DataContract.from_yaml(log_contract_path)\n",
"\n",
"# ── 2. Run the pipeline a few times with varying data quality ─────────\n",
"proc = ll.DataProcessor(log_contract)\n",
"\n",
"\n",
"def simulate_run(data):\n",
" proc.run(pl.DataFrame(data))\n",
" # Status is normally set by the pipeline runner after materialize;\n",
" # in standalone mode we stamp it manually.\n",
" report = proc.last_report\n",
" quarantined = (report.get(\"counts\") or {}).get(\"quarantined\", 0)\n",
" report[\"status\"] = \"warning\" if quarantined else \"success\"\n",
" write_run_log(report, log_contract)\n",
"\n",
"\n",
"# Run 1: Perfect data\n",
"simulate_run({\"user_id\": [1, 2], \"age\": [25, 30]})\n",
"\n",
"# Run 2: One invalid row (age < 18)\n",
"simulate_run({\"user_id\": [3, 4], \"age\": [15, 40]})\n",
"\n",
"# Run 3: All invalid rows\n",
"simulate_run({\"user_id\": [5, 6], \"age\": [10, 12]})\n",
"\n",
"# ── 3. Query the actual Run Logs telemetry table ──────────────────────\n",
"con = duckdb.connect(DB_PATH, read_only=True)\n",
"query = \"\"\"\n",
" SELECT \n",
" run_id,\n",
" contract,\n",
" dataset, \n",
" counts_source, \n",
" counts_good, \n",
" counts_quarantined, \n",
" quarantine_ratio,\n",
" status,\n",
" start_time, \n",
" end_time,\n",
" run_duration_seconds\n",
" FROM pipeline_run_logs\n",
"\"\"\"\n",
"logs_df = con.execute(query).pl()\n",
"con.close()\n",
"\n",
"print(\"AUTOMATED RUN LOGS (Queried from actual DuckDB backend):\")\n",
"display(logs_df)\n",
"\n",
"print(f\"\\n\\u2705 {len(logs_df)} runs captured. BI tools can connect directly to: {DB_PATH}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## What You Just Saw\n",
"\n",
"| # | Feature | How |\n",
"|---|---------|-----|\n",
"| 1 | **Synthetic Data** | `DataGenerator` with `invalid_ratio` for controlled bad rows |\n",
"| 2 | **AI-Steered Generation** | Natural language prompts for targeted scenarios |\n",
"| 3 | **Streaming Simulation** | `generate_stream()` for time-windowed batches |\n",
"| 4 | **Referential Integrity** | `generate_related()` for FK/PK consistency |\n",
"| 5 | **Contract from Schema** | `infer_contract()` from DDL strings, tuples, or dicts — no data needed |\n",
"| 6 | **Contract from CSV** | `infer_contract()` auto-detects types, PII, and quality rules |\n",
"| 7 | **Unstructured Processing** | PDF/NLP extraction with contract validation |\n",
"| 8 | **Run Logs** | Structured JSON observability for every pipeline run |"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## Go Deeper — Explore by Capability\n",
"\n",
"Each notebook below maps to a pillar of LakeLogic's [Technical Capabilities](https://lakelogic.github.io/LakeLogic/#technical-capabilities):\n",
"\n",
"| # | Notebook | What You'll See |\n",
"|---|---|---|\n",
"| 🛡️ | **[Data Quality & Trust](01_data_quality_trust.ipynb)** | Reconciliation proofs, Pydantic validation, SQL-first rules, SLO monitoring |\n",
"| 📜 | **[Compliance & Governance](02_compliance_governance.ipynb)** | GDPR erasure in 2 lines, automatic lineage, cost intelligence |\n",
"| ⚡ | **[Engine & Scale](03_engine_scale.ipynb)** | Same contract on Polars & DuckDB, incremental processing, dry run |\n",
"| 🔧 | **[Developer Experience](04_developer_experience.ipynb)** | Structured diagnostics, DDL generation, surgical resets, multi-channel alerts |\n",
"| 🧬 | **[Data Generation & AI](05_data_generation_ai.ipynb)** | Synthetic data, referential integrity, edge case injection, contract inference |\n",
"| 🔌 | **[Integrations](06_integrations.ipynb)** | dbt adapter, dlt sources, contract-driven quality gates on arrival |\n",
"\n",
"> **Each notebook is self-contained** — pick the capability that matters most to you and run it independently."
]
}
],
"metadata": {
"colab": {
"name": "LakeLogic — Data Generation & AI",
"provenance": []
},
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.13"
}
},
"nbformat": 4,
"nbformat_minor": 0
}