--- name: afrexai-data-engineering description: "Data Engineering" --- # Data Engineering Command Center Complete methodology for designing, building, operating, and scaling data pipelines and infrastructure. Zero dependencies — pure agent skill. --- ## Phase 1: Data Architecture Assessment Before building anything, understand the landscape. ### Architecture Brief ```yaml project_name: "" business_context: "" data_consumers: - team: "" use_case: "" # analytics | ML | operational | reporting | reverse-ETL latency_requirement: "" # real-time (<1s) | near-real-time (<5min) | batch (hourly+) query_pattern: "" # ad-hoc | scheduled | API | dashboard current_state: sources: [] # list every system producing data storage: [] # where data lives today pain_points: [] # what's broken, slow, unreliable data_volume: current_gb_per_day: 0 growth_rate_percent: 0 retention_months: 0 constraints: budget_monthly_usd: 0 team_size: 0 skill_level: "" # junior | mid | senior | mixed compliance: [] # GDPR, HIPAA, SOX, PCI, none cloud_provider: "" # AWS | GCP | Azure | multi | on-prem ``` ### Architecture Pattern Decision Matrix | Signal | Pattern | When to Use | |--------|---------|-------------| | All consumers need data hourly+ | **Batch ETL** | Reporting, warehousing, most analytics | | Some need <5 min latency | **Micro-batch** | Dashboard freshness, near-real-time analytics | | Events need <1s processing | **Streaming** | Fraud detection, real-time pricing, alerts | | Need both batch + streaming | **Lambda** | When batch accuracy + real-time speed both matter | | Want to simplify Lambda | **Kappa** | When you can reprocess from stream replay | | Data lake + warehouse combined | **Lakehouse** | When you need both cheap storage + fast SQL | | Sources change independently | **Data Mesh** | Large orgs, domain-owned data, >5 teams | | ML is primary consumer | **Feature Store** | ML-heavy orgs with feature reuse needs | ### Technology Selection Guide #### Orchestration | Tool | Best For | Avoid When | |------|----------|------------| | **Airflow** | Complex DAGs, Python-native teams, mature ecosystem | Simple pipelines (<5 tasks) | | **Dagster** | Software-defined assets, strong typing, dev experience | Legacy team resistant to new paradigms | | **Prefect** | Dynamic workflows, cloud-native, Python-first | Need on-prem with no cloud dependency | | **dbt** | SQL transformations, ELT, analytics engineering | Non-SQL transforms, streaming | | **Temporal** | Long-running workflows, retry-heavy, microservices | Simple ETL, small teams | | **Cron + scripts** | <3 pipelines, solo engineer, simple schedules | Anything with dependencies or retries | #### Processing | Tool | Best For | Avoid When | |------|----------|------------| | **Spark** | >100GB, complex transforms, ML pipelines | <10GB (overkill), real-time streaming | | **DuckDB** | Local analytics, <100GB, SQL on files | Distributed processing, production streaming | | **Polars** | Single-node, Rust-speed, <50GB, DataFrames | Distributed, need Spark ecosystem | | **Pandas** | <1GB, quick analysis, prototyping | Production pipelines, anything >5GB | | **Flink** | True streaming, event-time processing | Batch-only, small team (steep learning curve) | | **SQL (warehouse)** | ELT in Snowflake/BigQuery/Redshift | Complex ML transforms, binary data | #### Storage | Tool | Best For | Avoid When | |------|----------|------------| | **Snowflake** | Analytics, separation of compute/storage, multi-cloud | Tight budget, real-time OLTP | | **BigQuery** | GCP-native, serverless, large-scale analytics | Multi-cloud, need fine-grained cost control | | **Redshift** | AWS-native, existing AWS ecosystem | Elastic scaling needs, multi-cloud | | **Databricks** | ML + analytics unified, Spark-native, lakehouse | Pure SQL analytics, small data | | **PostgreSQL** | OLTP + light analytics, <500GB, budget-conscious | >1TB analytics, real-time dashboards on large data | | **S3/GCS/ADLS** | Raw data lake, cheap storage, any format | Direct SQL queries (need compute layer) | | **Delta Lake/Iceberg** | Table format on data lake, ACID on files | Simple file storage, no lakehouse need | --- ## Phase 2: Data Modeling ### Modeling Methodology Decision | Approach | Best For | Key Concept | |----------|----------|-------------| | **Kimball (Dimensional)** | BI/reporting, star schemas | Facts + Dimensions, business-process-centric | | **Inmon (3NF)** | Enterprise data warehouse, single source of truth | Normalized, subject-area-centric | | **Data Vault 2.0** | Agile warehousing, auditability, multiple sources | Hubs + Links + Satellites, insert-only | | **One Big Table (OBT)** | Simple analytics, few joins, dashboard performance | Pre-joined, denormalized, fast queries | | **Activity Schema** | Event analytics, product analytics | Entity + Activity + Feature columns | ### Dimensional Model Template ```yaml fact_table: name: "fact_[business_process]" grain: "" # one row = one [what]? grain_statement: "One row per [transaction/event/snapshot] at [time grain]" measures: - name: "" type: "" # additive | semi-additive | non-additive aggregation: "" # SUM | AVG | COUNT | MIN | MAX | COUNT DISTINCT business_definition: "" degenerate_dimensions: [] # IDs stored in fact (order_number, invoice_id) foreign_keys: [] # links to dimension tables dimensions: - name: "dim_[entity]" type: "" # Type 1 (overwrite) | Type 2 (history) | Type 3 (previous value) natural_key: "" # business key from source surrogate_key: "" # warehouse-generated key attributes: - name: "" source: "" scd_type: "" # 1 | 2 | 3 hierarchy: [] # e.g., [country, region, city, store] ``` ### SCD Type Decision Guide | Scenario | SCD Type | Implementation | |----------|----------|----------------| | Don't care about history | **Type 1** | UPDATE in place | | Need full history | **Type 2** | New row + valid_from/valid_to + is_current flag | | Only need previous value | **Type 3** | Add previous_[column] | | Track changes with timestamps | **Type 4** | Mini-dimension (history table) | | Hybrid: some attrs Type 1, some Type 2 | **Type 6** | Combine 1+2+3 in one table | **Default recommendation:** Type 2 for anything business-critical (customer status, product price, employee department). Type 1 for everything else. ### Naming Conventions | Object | Convention | Example | |--------|-----------|---------| | Raw/staging tables | `raw_[source]_[table]` | `raw_stripe_payments` | | Staging models | `stg_[source]__[entity]` | `stg_stripe__payments` | | Intermediate models | `int_[entity]_[verb]` | `int_orders_pivoted` | | Mart/fact tables | `fct_[business_process]` | `fct_orders` | | Dimension tables | `dim_[entity]` | `dim_customers` | | Metrics/aggregates | `mrt_[domain]_[metric]` | `mrt_sales_daily` | | Snapshots | `snp_[entity]_[grain]` | `snp_inventory_daily` | | Columns: boolean | `is_[state]` or `has_[thing]` | `is_active`, `has_subscription` | | Columns: timestamp | `[event]_at` | `created_at`, `shipped_at` | | Columns: date | `[event]_date` | `order_date` | | Columns: ID | `[entity]_id` | `customer_id` | | Columns: amount | `[thing]_amount` | `order_amount` | | Columns: count | `[thing]_count` | `line_item_count` | --- ## Phase 3: Pipeline Design Patterns ### Universal Pipeline Template ```yaml pipeline: name: "" owner: "" schedule: "" # cron expression sla_minutes: 0 # max acceptable runtime tier: "" # 1 (critical) | 2 (important) | 3 (nice-to-have) extract: source_system: "" connection: "" strategy: "" # full | incremental | CDC | log-based incremental_key: "" # column for incremental (e.g., updated_at) watermark_storage: "" # where to persist last-extracted position transform: engine: "" # SQL | Spark | Python | dbt stages: - name: "clean" operations: [] # dedupe, null handling, type casting, trimming - name: "conform" operations: [] # standardize codes, currencies, timezones - name: "enrich" operations: [] # lookups, calculations, derived fields - name: "aggregate" operations: [] # rollups, pivots, window functions load: target_system: "" strategy: "" # append | upsert | merge | truncate-reload | partition-swap merge_keys: [] partition_key: "" clustering_keys: [] quality_gates: pre_load: [] # checks before writing post_load: [] # checks after writing error_handling: strategy: "" # fail-fast | dead-letter | retry | skip-and-alert max_retries: 3 retry_delay_seconds: 300 alert_channels: [] ``` ### Extraction Strategy Decision Tree ``` Is the source database? ├── Yes → Does it support CDC? │ ├── Yes → Use CDC (Debezium, AWS DMS, Fivetran) │ │ Best for: high-volume, low-latency, minimal source impact │ └── No → Does it have a reliable updated_at column? │ ├── Yes → Incremental extraction on updated_at │ │ ⚠️ Won't catch hard deletes — add periodic full reconciliation │ └── No → Full extraction │ Only viable for small tables (<1M rows) ├── Is it an API? │ ├── Supports webhooks? → Event-driven ingestion │ ├── Has cursor/pagination? → Incremental with cursor bookmark │ └── No pagination? → Full pull with rate-limit handling ├── Is it files (S3, SFTP, email)? │ └── Event-triggered (S3 notification, file watcher) │ Validate: schema, completeness, filename pattern └── Is it streaming (Kafka, Kinesis, Pub/Sub)? └── Consumer group with offset management Key decisions: at-least-once vs exactly-once, consumer lag alerting ``` ### Load Strategy Decision | Strategy | When | Trade-off | |----------|------|-----------| | **Append** | Event/log data, immutable facts | Simple but grows forever — partition + retain | | **Upsert/Merge** | Dimension updates, SCD Type 1 | Handles updates but slower on large tables | | **Truncate-Reload** | Small tables (<1M), reference data | Simple but window of missing data | | **Partition Swap** | Large fact tables, daily loads | Atomic, fast, but needs partition alignment | | **Soft Delete** | Need audit trail of deletions | Adds complexity to every downstream query | ### Idempotency Rules (NON-NEGOTIABLE) Every pipeline MUST be re-runnable without side effects: 1. **Use MERGE/UPSERT, never blind INSERT** for mutable data 2. **Partition-swap for immutable data** — drop partition + reload 3. **Store watermarks externally** — not in the pipeline code 4. **Dedup at ingestion** — use source natural keys 5. **Test by running twice** — output must be identical both times --- ## Phase 4: Data Quality Framework ### Quality Dimensions | Dimension | Definition | Example Check | |-----------|-----------|---------------| | **Completeness** | No missing values where required | `NOT NULL` on required fields, row count within range | | **Uniqueness** | No unexpected duplicates | Primary key uniqueness, natural key uniqueness | | **Validity** | Values within expected domain | Enum checks, range checks, regex patterns | | **Accuracy** | Data matches real-world truth | Cross-system reconciliation, manual spot checks | | **Freshness** | Data arrives on time | `MAX(loaded_at) > NOW() - INTERVAL '2 hours'` | | **Consistency** | Same data agrees across systems | Sum reconciliation between source and target | ### Quality Check Templates ```sql -- Completeness: Required fields not null SELECT COUNT(*) AS null_violations FROM {table} WHERE {required_column} IS NULL; -- Threshold: 0 -- Uniqueness: No duplicate primary keys SELECT {pk_column}, COUNT(*) AS dupe_count FROM {table} GROUP BY {pk_column} HAVING COUNT(*) > 1; -- Threshold: 0 -- Freshness: Data arrived within SLA SELECT CASE WHEN MAX({timestamp_col}) > CURRENT_TIMESTAMP - INTERVAL '{sla_hours} hours' THEN 'PASS' ELSE 'FAIL' END AS freshness_check FROM {table}; -- Volume: Row count within expected range SELECT CASE WHEN COUNT(*) BETWEEN {min_expected} AND {max_expected} THEN 'PASS' ELSE 'FAIL' END AS volume_check FROM {table} WHERE {partition_col} = '{run_date}'; -- Referential: FK integrity SELECT COUNT(*) AS orphan_count FROM {fact_table} f LEFT JOIN {dim_table} d ON f.{fk} = d.{pk} WHERE d.{pk} IS NULL; -- Threshold: 0 -- Distribution: No unexpected skew SELECT {column}, COUNT(*) AS cnt, ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct FROM {table} GROUP BY {column} ORDER BY cnt DESC; -- Alert if any single value > {max_pct}% -- Cross-system reconciliation SELECT (SELECT SUM(amount) FROM source_system.orders WHERE date = '{date}') AS source_total, (SELECT SUM(amount) FROM warehouse.fct_orders WHERE order_date = '{date}') AS target_total, ABS(source_total - target_total) AS variance; -- Threshold: variance < 0.01 * source_total (1%) ``` ### Data Contract Template ```yaml contract: name: "" version: "" owner: "" # team responsible for producing this data consumers: [] # teams consuming this data sla: freshness_hours: 0 availability_percent: 99.9 support_hours: "" # business-hours | 24x7 schema: - column: "" type: "" nullable: false description: "" business_definition: "" pii: false checks: - type: "" # not_null | unique | range | enum | regex | custom params: {} breaking_change_policy: "" # notify-30-days | version-bump | never-break notification_channel: "" ``` ### Quality Severity Levels | Level | Definition | Response | |-------|-----------|----------| | **P0 — Critical** | Data corruption, wrong numbers in production dashboards, compliance data wrong | Stop pipeline, alert immediately, rollback if possible | | **P1 — High** | Missing data for key reports, SLA breach, >5% of records affected | Alert team, fix within 4 hours, post-mortem required | | **P2 — Medium** | Non-critical field quality, <1% records affected, no downstream impact | Fix in next sprint, add monitoring to prevent recurrence | | **P3 — Low** | Cosmetic issues, edge cases, non-critical data | Backlog, fix when convenient | --- ## Phase 5: Performance Optimization ### SQL Optimization Checklist | Problem | Fix | Impact | |---------|-----|--------| | Full table scan | Add/use partition pruning | 10-100x faster | | Large joins | Pre-aggregate before joining | 5-50x faster | | SELECT * | Select only needed columns | 2-10x faster (columnar stores) | | Correlated subquery | Rewrite as JOIN or window function | 10-100x faster | | DISTINCT on large result | Fix upstream duplication instead | 2-5x faster | | ORDER BY without LIMIT | Add LIMIT or remove if not needed | Prevents memory spills | | String operations in WHERE | Pre-compute, use lookup table | Enables index usage | | Multiple passes over same data | Combine with CASE WHEN + GROUP BY | 2-5x faster | | NOT IN with NULLs | Use NOT EXISTS or LEFT JOIN IS NULL | Correctness + performance | ### Spark Optimization Guide | Problem | Solution | |---------|----------| | Shuffle-heavy joins | Broadcast small table (`broadcast(df)`) if <100MB | | Data skew | Salt the skewed key: add random prefix, join on salted key, aggregate | | Small files | Coalesce output: `.coalesce(target_files)` or use adaptive query execution | | Too many partitions | `spark.sql.shuffle.partitions` = 2-3x cluster cores | | OOM errors | Increase `spark.executor.memory`, reduce partition size, spill to disk | | Slow writes | Use Parquet with snappy, partition by date, avoid small writes | | Repeated computation | `.cache()` or `.persist()` DataFrames used >1 time | | Complex transformations | Push down predicates, filter early, select early | ### Partitioning Strategy | Data Type | Partition Key | Why | |-----------|--------------|-----| | Transactional/event | Date (daily or monthly) | Most queries filter by time range | | Multi-tenant | Tenant ID + date | Isolate tenant queries, time-range pruning | | Geospatial | Region + date | Regional queries are common | | Log data | Date + hour | High volume needs finer partitions | | Reference/dimension | Don't partition | Too small, full scan is fine | **Rules:** - Target 100MB-1GB per partition (compressed) - <10,000 total partitions per table - Never partition on high-cardinality columns (user_id) - Always include partition key in WHERE clauses --- ## Phase 6: Data Governance & Cataloging ### Data Classification | Level | Examples | Controls | |-------|---------|----------| | **Public** | Product catalog, published stats | No restrictions | | **Internal** | Aggregated metrics, non-PII analytics | Auth required, audit logging | | **Confidential** | Customer PII, financial records, HR data | Encryption, column-level access, masking | | **Restricted** | SSN, payment cards, health records, passwords | Encryption at rest + transit, tokenization, audit every access, retention limits | ### PII Handling Rules 1. **Identify:** Scan all sources for PII columns (name, email, phone, SSN, DOB, address, IP) 2. **Classify:** Tag each with sensitivity level 3. **Minimize:** Only ingest PII you actually need 4. **Protect:** - Hash or tokenize in staging (SHA-256 with salt for pseudonymization) - Dynamic masking for non-privileged users - Column-level encryption for restricted data 5. **Retain:** Auto-delete after retention period 6. **Audit:** Log every query touching PII columns 7. **Right to delete:** Build a deletion pipeline that propagates across all derived tables ### Data Catalog Entry Template ```yaml dataset: name: "" description: "" owner_team: "" steward: "" # person responsible for quality domain: "" # sales | marketing | finance | product | engineering tier: "" # gold (trusted) | silver (cleaned) | bronze (raw) lineage: sources: [] # upstream datasets/systems transformations: "" # brief description of key transforms downstream: [] # who consumes this refresh: schedule: "" sla_hours: 0 last_successful_run: "" quality: tests: [] # list of quality checks last_score: 0 # 0-100 known_issues: [] access: classification: "" # public | internal | confidential | restricted pii_columns: [] access_request_process: "" # how to get access usage: avg_daily_queries: 0 top_consumers: [] cost_monthly_usd: 0 ``` --- ## Phase 7: Pipeline Monitoring & Alerting ### Pipeline Health Dashboard ```yaml dashboard: pipeline_metrics: - metric: "Pipeline Success Rate" formula: "successful_runs / total_runs * 100" target: ">99%" alert_threshold: "<95%" - metric: "Average Runtime" formula: "avg(end_time - start_time) over 7 days" target: "80% of SLA" - metric: "Data Freshness" formula: "NOW() - MAX(loaded_at)" target: "SLA" - metric: "Data Volume Variance" formula: "abs(today_rows - avg_7d_rows) / avg_7d_rows * 100" target: "<20%" alert_threshold: ">50%" - metric: "Quality Check Pass Rate" formula: "passed_checks / total_checks * 100" target: "100%" alert_threshold: "<95%" - metric: "Failed Pipeline Count" formula: "count where status = failed in last 24h" target: "0" alert_threshold: ">0" - metric: "Backfill Queue" formula: "count of pending backfill requests" target: "0" alert_threshold: ">5" - metric: "Infrastructure Cost" formula: "compute + storage + egress" target: "110% budget" ``` ### Alert Severity | Severity | Condition | Response Time | Example | |----------|-----------|---------------|---------| | **P0** | Revenue/compliance impacting | 15 min | Payment pipeline down, regulatory report delayed | | **P1** | Business-critical dashboard stale | 1 hour | Executive dashboard >4h stale | | **P2** | Non-critical pipeline failed | 4 hours | Marketing attribution delayed | | **P3** | Warning/degradation | Next business day | Pipeline 80% of SLA, minor quality drift | ### Structured Logging Standard Every pipeline run MUST log: ```json { "pipeline_name": "", "run_id": "", "started_at": "", "completed_at": "", "status": "success|failed|partial", "stage": "", "rows_extracted": 0, "rows_transformed": 0, "rows_loaded": 0, "rows_rejected": 0, "quality_checks_passed": 0, "quality_checks_failed": 0, "duration_seconds": 0, "error_message": "", "watermark_before": "", "watermark_after": "" } ``` --- ## Phase 8: Testing Strategy ### Pipeline Test Pyramid | Layer | What to Test | How | When | |-------|-------------|-----|------| | **Unit** | Individual transforms, business logic | pytest with fixtures, dbt unit tests | Every PR | | **Integration** | Source connectivity, schema compatibility | Test against staging/dev environment | Daily + PR | | **Contract** | Schema hasn't changed, data types stable | Schema registry, contract tests | Every pipeline run | | **Data Quality** | Completeness, uniqueness, freshness, validity | Quality framework checks | Every pipeline run | | **E2E** | Full pipeline produces correct output | Golden dataset comparison | Weekly + release | | **Performance** | Runtime within SLA, no regression | Benchmark against historical runs | Weekly | ### dbt Testing Checklist ```yaml # For every model, define at minimum: models: - name: fct_orders columns: - name: order_id tests: - unique - not_null - name: customer_id tests: - not_null - relationships: to: ref('dim_customers') field: customer_id - name: order_amount tests: - not_null - dbt_utils.accepted_range: min_value: 0 max_value: 1000000 - name: order_status tests: - accepted_values: values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled'] - name: ordered_at tests: - not_null - dbt_utils.recency: datepart: day field: ordered_at interval: 2 ``` ### Backfill Protocol When you need to reprocess historical data: 1. **Scope:** Define exact date range and affected tables 2. **Impact assessment:** What downstream models/dashboards will be affected? 3. **Communication:** Notify consumers of temporary data inconsistency 4. **Isolation:** Run backfill in separate compute to avoid impacting current pipelines 5. **Validation:** Compare row counts and key metrics pre/post backfill 6. **Execution:** Process in reverse-chronological order (most recent first) 7. **Monitoring:** Watch for resource spikes, duplicate creation 8. **Verification:** Reconcile against source after completion 9. **Documentation:** Log what was backfilled, why, and any anomalies found --- ## Phase 9: Cost Optimization ### Cloud Cost Reduction Strategies | Strategy | Savings | Effort | |----------|---------|--------| | Right-size compute (auto-scaling) | 20-40% | Low | | Use spot/preemptible instances for batch | 60-80% | Medium | | Compress data (Parquet + Snappy/Zstd) | 50-80% storage | Low | | Lifecycle policies (hot → warm → cold → archive) | 40-70% storage | Low | | Eliminate unused tables/pipelines | 10-30% | Low | | Optimize query patterns (partition pruning) | 30-60% compute | Medium | | Reserved capacity for steady-state | 30-50% | Medium | | Cache expensive queries | 20-50% compute | Medium | ### Cost Allocation Template ```yaml cost_tracking: by_pipeline: - pipeline: "" compute_monthly_usd: 0 storage_monthly_usd: 0 egress_monthly_usd: 0 total: 0 cost_per_row: 0 # total / rows_processed business_value: "" # what revenue/decision does this enable? roi_justified: true # is the cost worth it? optimization_opportunities: - description: "" estimated_savings_usd: 0 effort: "" # low | medium | high priority: 0 # 1 = do now ``` ### Cost Red Flags - Single pipeline >30% of total spend - Cost per row increasing month-over-month - Tables with 0 queries in 30 days - Dev/staging environments running 24/7 - Full table scans on >1TB tables - Uncompressed data in cloud storage - Cross-region data transfer --- ## Phase 10: Operational Runbooks ### Pipeline Failure Triage ``` Pipeline failed → 1. Check error message in logs ├── Connection timeout → Check source availability, network, credentials ├── Schema mismatch → Source schema changed → update extract + notify ├── Data quality check failed → Investigate source data, check for anomalies ├── Out of memory → Increase resources or optimize query ├── Permission denied → Check IAM roles, token expiry ├── Duplicate key violation → Check idempotency, investigate source dupes └── Timeout (SLA breach) → Check data volume spike, query plan, cluster health 2. Determine impact ├── What dashboards/reports are affected? ├── What's the data freshness SLA? └── Who needs to be notified? 3. Fix ├── Transient (network, timeout) → Retry ├── Data issue → Fix source data, re-run with quality gate override if safe ├── Schema change → Update pipeline, backfill if needed └── Infrastructure → Scale up, file ticket with cloud provider 4. Post-fix ├── Verify data correctness ├── Update runbook with new failure mode └── Add monitoring/alerting to catch earlier next time ``` ### Schema Change Management When a source system changes schema: 1. **Detect:** Schema comparison check in extraction pipeline (hash schema, compare to registered) 2. **Classify:** - **Additive** (new column): Usually safe — add to pipeline, backfill if needed - **Rename**: Map old → new in transform, update downstream - **Type change**: Assess compatibility, may need cast or historical rebuild - **Column removed**: Critical — breaks queries, need immediate attention 3. **Test:** Run pipeline in dry-run mode with new schema 4. **Deploy:** Update transforms, quality checks, documentation 5. **Communicate:** Notify downstream consumers via data contract channel ### Disaster Recovery | Scenario | RPO | RTO | Recovery Steps | |----------|-----|-----|----------------| | Pipeline code lost | 0 (git) | 1h | Redeploy from git, restore orchestrator state | | Warehouse data corrupted | Varies | 4h | Restore from Time Travel/snapshot, re-run affected pipelines | | Source system down | N/A | Wait | Queue extractions, catch up with incremental once restored | | Cloud region outage | 24h | 8h | Failover to DR region if configured, else wait | | Credential compromise | 0 | 2h | Rotate all credentials, audit access logs, re-run affected pipelines | --- ## Phase 11: Advanced Patterns ### Slowly Changing Dimension Type 2 (SQL Template) ```sql -- Merge pattern for SCD Type 2 MERGE INTO dim_customer AS target USING ( SELECT * FROM stg_customers WHERE updated_at > (SELECT MAX(valid_from) FROM dim_customer) ) AS source ON target.customer_natural_key = source.customer_id AND target.is_current = TRUE -- Update: close old record WHEN MATCHED AND ( target.customer_name != source.name OR target.customer_status != source.status -- list all Type 2 tracked columns ) THEN UPDATE SET is_current = FALSE, valid_to = CURRENT_TIMESTAMP -- Insert: new record (both new customers and changed ones) WHEN NOT MATCHED THEN INSERT ( customer_natural_key, customer_name, customer_status, valid_from, valid_to, is_current ) VALUES ( source.customer_id, source.name, source.status, CURRENT_TIMESTAMP, '9999-12-31', TRUE ); -- Then insert new versions of changed records INSERT INTO dim_customer ( customer_natural_key, customer_name, customer_status, valid_from, valid_to, is_current ) SELECT customer_id, name, status, CURRENT_TIMESTAMP, '9999-12-31', TRUE FROM stg_customers s WHERE EXISTS ( SELECT 1 FROM dim_customer d WHERE d.customer_natural_key = s.customer_id AND d.is_current = FALSE AND d.valid_to = CURRENT_TIMESTAMP ); ``` ### CDC with Debezium (Architecture Pattern) ``` Source DB → Debezium Connector → Kafka Topic → ├── Stream processor (Flink/Spark Streaming) → Target DB ├── S3 sink connector → Data Lake (raw) └── Elasticsearch sink → Search index ``` Key decisions: - **Topic per table** or **single topic**: Per table (easier routing, independent scaling) - **Schema registry**: Always use (Confluent Schema Registry or AWS Glue) - **Serialization**: Avro (compact + schema evolution) or Protobuf (strict + fast) - **Offset management**: Connector manages; monitor consumer lag ### Feature Store Pattern ```yaml feature_store: entity: "customer" entity_key: "customer_id" features: - name: "total_orders_30d" description: "Total orders in last 30 days" type: "INT" source: "fct_orders" computation: "batch" # batch | streaming | on-demand freshness: "daily" ttl_hours: 48 - name: "avg_order_value_90d" description: "Average order value last 90 days" type: "FLOAT" source: "fct_orders" computation: "batch" freshness: "daily" ttl_hours: 48 - name: "last_login_minutes_ago" description: "Minutes since last login event" type: "INT" source: "events_stream" computation: "streaming" freshness: "real-time" ttl_hours: 1 serving: online: true # low-latency feature serving (Redis/DynamoDB) offline: true # batch feature retrieval for training point_in_time_correct: true # prevent feature leakage in ML training ``` ### Data Mesh Principles If operating at scale (>5 data teams): 1. **Domain ownership**: Each business domain owns its data products (not central data team) 2. **Data as a product**: Treat datasets like products — SLAs, documentation, discoverability 3. **Self-serve platform**: Central team builds the platform, domains build on top 4. **Federated governance**: Standards and interoperability maintained centrally, implementation decentralized **When NOT to use Data Mesh:** - <5 data producers/consumers - Small team (<20 engineers total) - Single business domain - Early-stage company (over-engineering) --- ## Quality Scoring Rubric (0-100) | Dimension | Weight | Scoring | |-----------|--------|---------| | **Pipeline Reliability** | 20 | 0=frequent failures, 10=some failures with manual recovery, 20=99.5%+ success rate with auto-retry | | **Data Quality** | 20 | 0=no checks, 10=basic null/unique checks, 20=comprehensive quality framework with contracts | | **Performance** | 15 | 0=regularly breaches SLA, 8=meets SLA, 15=well under SLA with optimization | | **Documentation** | 10 | 0=none, 5=basic README, 10=full catalog entries with lineage and business definitions | | **Monitoring** | 15 | 0=no alerts, 8=failure alerts only, 15=proactive monitoring with dashboards and anomaly detection | | **Testing** | 10 | 0=no tests, 5=basic smoke tests, 10=full test pyramid (unit+integration+contract+E2E) | | **Cost Efficiency** | 10 | 0=no cost tracking, 5=tracked, 10=optimized with ROI justification per pipeline | **Scoring guide:** - 0-40: Critical gaps — prioritize pipeline reliability and data quality - 41-60: Functional but fragile — add monitoring, testing, documentation - 61-80: Solid — optimize performance, cost, governance - 81-100: Excellent — maintain, innovate, mentor --- ## Edge Cases & Gotchas ### Timezone Traps - Store everything in UTC. Convert only at presentation layer - Event timestamps: use event time, not processing time - Daylight saving: `TIMESTAMP WITH TIME ZONE`, never `WITHOUT` - Late-arriving data: watermark strategy + allowed lateness window ### Late-Arriving Data - Define maximum acceptable lateness per source - Reprocess affected partitions when late data arrives - Track late arrival rate as a quality metric - Consider separate "late data" pipeline that patches in ### Exactly-Once Processing - True exactly-once is expensive. Most systems need at-least-once + idempotent writes - Use transaction IDs or natural keys for deduplication - Kafka: use idempotent producer + transactional consumer - Database: MERGE/UPSERT on natural key ### Schema Evolution - **Forward compatible**: New code reads old data (safe to deploy new readers first) - **Backward compatible**: Old code reads new data (safe to deploy new writers first) - **Full compatible**: Both directions (safest, most restrictive) - Use Avro or Protobuf with schema registry for streaming data ### Multi-Tenant Data - Tenant ID in every table, every query, every log - Row-level security in warehouse - Separate compute per tenant (or at least isolation) - Never join across tenants without explicit business reason - Tenant-aware backfill (don't rebuild all tenants for one tenant's issue) ### Data Lake Anti-Patterns - "Data Swamp": ingesting everything with no organization or catalog → only ingest what has a known consumer - Small files: thousands of <1MB files → compact regularly (target 100MB-1GB) - No table format: raw Parquet/CSV without Delta/Iceberg → loses ACID, schema evolution, time travel - No access controls: single bucket, everyone admin → implement IAM per domain/team --- ## Natural Language Commands Say any of these to activate specific workflows: 1. **"Design a data pipeline for [source] to [target]"** → Full pipeline template with extraction strategy, transforms, load pattern, quality checks 2. **"Model [entity/domain] for analytics"** → Dimensional model with fact/dimension tables, grain, measures, SCD types 3. **"Optimize this query/pipeline"** → Performance analysis with specific recommendations 4. **"Set up data quality for [table/pipeline]"** → Quality framework with checks, contracts, monitoring 5. **"Audit our data infrastructure"** → Full assessment using scoring rubric 6. **"Help with [Spark/Airflow/dbt/Kafka] issue"** → Troubleshooting with technology-specific guidance 7. **"Design a data catalog for our org"** → Catalog template with governance, classification, lineage 8. **"Plan a data migration from [old] to [new]"** → Migration plan with validation, rollback, parallel-run 9. **"Set up monitoring for our pipelines"** → Dashboard template with alerts, logging standards, runbooks 10. **"Review our data costs"** → Cost analysis with optimization strategies and ROI framework 11. **"Handle schema change in [source]"** → Change management protocol with impact assessment 12. **"Backfill [table] for [date range]"** → Backfill protocol with validation and communication plan