--- name: dbt-expert version: 1.0.0 description: Expert-level dbt (data build tool), models, tests, documentation, incremental models, macros, and Jinja templating category: data author: PCL Team license: Apache-2.0 tags: - dbt - analytics-engineering - sql - data-transformation - jinja - testing allowed-tools: - Read - Write - Edit - Bash - Glob - Grep requirements: dbt-core: ">=1.7.0" --- # dbt Expert You are an expert in dbt (data build tool) with deep knowledge of data modeling, testing, documentation, incremental models, macros, Jinja templating, and analytics engineering best practices. You design maintainable, tested, and documented data transformation pipelines. ## Core Expertise ### Project Structure and Configuration **dbt_project.yml:** ```yaml name: 'analytics' version: '1.0.0' config-version: 2 profile: 'analytics' model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"] snapshot-paths: ["snapshots"] clean-targets: - "target" - "dbt_packages" models: analytics: # Staging models (source system copies) staging: +materialized: view +schema: staging +tags: ["staging"] # Intermediate models (business logic) intermediate: +materialized: ephemeral +schema: intermediate +tags: ["intermediate"] # Mart models (final tables for BI) marts: +materialized: table +schema: marts +tags: ["marts"] finance: +schema: finance marketing: +schema: marketing # Model-specific configs models: staging: +persist_docs: relation: true columns: true vars: # Global variables start_date: '2024-01-01' exclude_test_data: true on-run-start: - "{{ log('Starting dbt run...', info=true) }}" on-run-end: - "{{ log('dbt run completed!', info=true) }}" ``` **profiles.yml:** ```yaml analytics: target: dev outputs: dev: type: postgres host: localhost port: 5432 user: "{{ env_var('DBT_USER') }}" password: "{{ env_var('DBT_PASSWORD') }}" dbname: analytics_dev schema: dbt_{{ env_var('USER') }} threads: 4 keepalives_idle: 0 prod: type: postgres host: prod-db.company.com port: 5432 user: "{{ env_var('DBT_PROD_USER') }}" password: "{{ env_var('DBT_PROD_PASSWORD') }}" dbname: analytics_prod schema: analytics threads: 8 keepalives_idle: 0 snowflake: type: snowflake account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}" user: "{{ env_var('SNOWFLAKE_USER') }}" password: "{{ env_var('SNOWFLAKE_PASSWORD') }}" role: transformer database: analytics warehouse: transforming schema: dbt_{{ env_var('USER') }} threads: 8 ``` ### Sources and Staging Models **sources.yml:** ```yaml version: 2 sources: - name: raw_postgres description: Raw data from production PostgreSQL database database: production schema: public tables: - name: users description: User account information columns: - name: id description: Primary key tests: - unique - not_null - name: email description: User email address tests: - unique - not_null - name: created_at description: Account creation timestamp tests: - not_null # Freshness checks freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} # Loaded at timestamp loaded_at_field: _synced_at - name: orders description: Order transactions columns: - name: id tests: - unique - not_null - name: user_id description: Foreign key to users tests: - not_null - relationships: to: source('raw_postgres', 'users') field: id - name: total_amount tests: - not_null - name: status tests: - accepted_values: values: ['pending', 'completed', 'cancelled', 'refunded'] - name: raw_s3 description: Raw data files from S3 meta: external_location: 's3://company-data/raw/' tables: - name: events description: Event tracking data external: location: 's3://company-data/raw/events/' file_format: parquet ``` **Staging Models:** ```sql -- models/staging/stg_users.sql {{ config( materialized='view', tags=['daily'] ) }} with source as ( select * from {{ source('raw_postgres', 'users') }} ), renamed as ( select -- Primary key id as user_id, -- Attributes email, first_name, last_name, {{ dbt_utils.generate_surrogate_key(['email']) }} as user_key, -- Flags is_active, is_deleted, -- Timestamps created_at, updated_at, deleted_at, -- Metadata _synced_at as dbt_loaded_at from source where not is_deleted or deleted_at is null ) select * from renamed -- models/staging/stg_orders.sql {{ config( materialized='view' ) }} with source as ( select * from {{ source('raw_postgres', 'orders') }} ), renamed as ( select -- Primary key id as order_id, -- Foreign keys user_id, -- Metrics total_amount, tax_amount, shipping_amount, total_amount - tax_amount - shipping_amount as subtotal, -- Dimensions status, payment_method, -- Timestamps created_at as order_created_at, updated_at as order_updated_at, completed_at from source ) select * from renamed ``` ### Intermediate and Mart Models **Intermediate Models:** ```sql -- models/intermediate/int_order_items_joined.sql {{ config( materialized='ephemeral' ) }} with orders as ( select * from {{ ref('stg_orders') }} ), order_items as ( select * from {{ ref('stg_order_items') }} ), products as ( select * from {{ ref('stg_products') }} ), joined as ( select orders.order_id, orders.user_id, orders.order_created_at, order_items.order_item_id, order_items.quantity, order_items.unit_price, products.product_id, products.product_name, products.category, order_items.quantity * order_items.unit_price as line_total from orders inner join order_items on orders.order_id = order_items.order_id inner join products on order_items.product_id = products.product_id ) select * from joined ``` **Mart Models:** ```sql -- models/marts/fct_orders.sql {{ config( materialized='table', tags=['fact'] ) }} with orders as ( select * from {{ ref('stg_orders') }} ), order_items as ( select order_id, count(*) as item_count, sum(quantity) as total_quantity, sum(line_total) as items_subtotal from {{ ref('int_order_items_joined') }} group by order_id ), final as ( select -- Primary key orders.order_id, -- Foreign keys orders.user_id, -- Metrics orders.total_amount, orders.subtotal, orders.tax_amount, orders.shipping_amount, order_items.item_count, order_items.total_quantity, -- Dimensions orders.status, orders.payment_method, -- Timestamps orders.order_created_at, orders.completed_at, -- Metadata current_timestamp() as dbt_updated_at from orders left join order_items on orders.order_id = order_items.order_id ) select * from final -- models/marts/dim_customers.sql {{ config( materialized='table', tags=['dimension'] ) }} with users as ( select * from {{ ref('stg_users') }} ), orders as ( select * from {{ ref('fct_orders') }} ), customer_orders as ( select user_id, count(*) as lifetime_orders, sum(total_amount) as lifetime_value, avg(total_amount) as avg_order_value, min(order_created_at) as first_order_at, max(order_created_at) as last_order_at, max(completed_at) as last_completed_at from orders where status = 'completed' group by user_id ), final as ( select -- Primary key users.user_id, users.user_key, -- Attributes users.email, users.first_name, users.last_name, users.first_name || ' ' || users.last_name as full_name, -- Customer metrics coalesce(customer_orders.lifetime_orders, 0) as lifetime_orders, coalesce(customer_orders.lifetime_value, 0) as lifetime_value, customer_orders.avg_order_value, -- Segmentation case when customer_orders.lifetime_value >= 10000 then 'VIP' when customer_orders.lifetime_value >= 5000 then 'High Value' when customer_orders.lifetime_value >= 1000 then 'Medium Value' when customer_orders.lifetime_value > 0 then 'Low Value' else 'No Orders' end as customer_segment, -- Timestamps users.created_at as user_created_at, customer_orders.first_order_at, customer_orders.last_order_at, -- Metadata current_timestamp() as dbt_updated_at from users left join customer_orders on users.user_id = customer_orders.user_id where users.is_active ) select * from final ``` ### Incremental Models **Incremental Loading:** ```sql -- models/marts/fct_events.sql {{ config( materialized='incremental', unique_key='event_id', on_schema_change='fail', incremental_strategy='merge' ) }} with events as ( select * from {{ ref('stg_events') }} {% if is_incremental() %} -- Only load new events where event_timestamp > (select max(event_timestamp) from {{ this }}) {% endif %} ), enriched as ( select event_id, user_id, event_type, event_timestamp, {{ dbt_utils.generate_surrogate_key(['user_id', 'event_timestamp']) }} as event_key, properties, current_timestamp() as dbt_loaded_at from events ) select * from enriched -- Incremental with delete + insert {{ config( materialized='incremental', unique_key='date', incremental_strategy='delete+insert' ) }} with daily_metrics as ( select date_trunc('day', order_created_at) as date, count(*) as order_count, sum(total_amount) as revenue from {{ ref('fct_orders') }} {% if is_incremental() %} where date_trunc('day', order_created_at) >= date_trunc('day', current_date - interval '7 days') {% endif %} group by 1 ) select * from daily_metrics ``` ### Tests **Schema Tests:** ```yaml # models/marts/schema.yml version: 2 models: - name: fct_orders description: Order transactions fact table columns: - name: order_id description: Unique order identifier tests: - unique - not_null - name: user_id description: Customer identifier tests: - not_null - relationships: to: ref('dim_customers') field: user_id - name: total_amount description: Order total amount tests: - not_null - dbt_utils.accepted_range: min_value: 0 max_value: 1000000 - name: status tests: - accepted_values: values: ['pending', 'completed', 'cancelled', 'refunded'] - name: dim_customers description: Customer dimension table tests: # Table-level test - dbt_utils.unique_combination_of_columns: combination_of_columns: - user_id - email ``` **Custom Tests:** ```sql -- tests/assert_positive_revenue.sql -- This test fails if any daily revenue is negative select date, sum(total_amount) as revenue from {{ ref('fct_orders') }} where status = 'completed' group by date having sum(total_amount) < 0 -- tests/assert_order_counts_match.sql -- Check that order counts match between tables with orders_table as ( select count(*) as order_count from {{ ref('fct_orders') }} ), events_table as ( select count(distinct order_id) as order_count from {{ ref('fct_events') }} where event_type = 'order_completed' ) select * from orders_table cross join events_table where orders_table.order_count != events_table.order_count ``` **Data Tests:** ```sql -- tests/generic/test_valid_percentage.sql {% test valid_percentage(model, column_name) %} select * from {{ model }} where {{ column_name }} < 0 or {{ column_name }} > 1 {% endtest %} -- Usage in schema.yml # - name: conversion_rate # tests: # - valid_percentage ``` ### Macros **Reusable Macros:** ```sql -- macros/cents_to_dollars.sql {% macro cents_to_dollars(column_name, scale=2) %} ({{ column_name }} / 100.0)::numeric(16, {{ scale }}) {% endmacro %} -- Usage: {{ cents_to_dollars('price_cents') }} -- macros/generate_alias_name.sql {% macro generate_alias_name(custom_alias_name=none, node=none) -%} {%- if custom_alias_name is none -%} {{ node.name }} {%- else -%} {{ custom_alias_name | trim }} {%- endif -%} {%- endmacro %} -- macros/date_spine.sql {% macro date_spine(start_date, end_date) %} with date_spine as ( {{ dbt_utils.date_spine( datepart="day", start_date="cast('" ~ start_date ~ "' as date)", end_date="cast('" ~ end_date ~ "' as date)" ) }} ) select date_day from date_spine {% endmacro %} -- macros/grant_select.sql {% macro grant_select(schema, role) %} {% set sql %} grant select on all tables in schema {{ schema }} to {{ role }}; {% endset %} {% do run_query(sql) %} {% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %} {% endmacro %} -- Usage in on-run-end hook -- {{ grant_select('analytics', 'analyst') }} ``` **Advanced Macros:** ```sql -- macros/pivot_metrics.sql {% macro pivot_metrics(column, metric, values) %} {% for value in values %} sum(case when {{ column }} = '{{ value }}' then {{ metric }} else 0 end) as {{ value | replace(' ', '_') | lower }} {%- if not loop.last -%},{%- endif %} {% endfor %} {% endmacro %} -- Usage: -- select -- date, -- {{ pivot_metrics('status', 'total_amount', ['pending', 'completed', 'cancelled']) }} -- from orders -- group by date -- macros/generate_schema_name.sql {% macro generate_schema_name(custom_schema_name, node) -%} {%- set default_schema = target.schema -%} {%- if target.name == 'prod' and custom_schema_name is not none -%} {{ custom_schema_name | trim }} {%- else -%} {{ default_schema }}_{{ custom_schema_name | trim }} {%- endif -%} {%- endmacro %} ``` ### Snapshots (SCD Type 2) **Timestamp Strategy:** ```sql -- snapshots/orders_snapshot.sql {% snapshot orders_snapshot %} {{ config( target_schema='snapshots', unique_key='order_id', strategy='timestamp', updated_at='updated_at', invalidate_hard_deletes=True ) }} select * from {{ source('raw_postgres', 'orders') }} {% endsnapshot %} ``` **Check Strategy:** ```sql -- snapshots/customers_snapshot.sql {% snapshot customers_snapshot %} {{ config( target_schema='snapshots', unique_key='customer_id', strategy='check', check_cols=['email', 'status', 'plan_type'], invalidate_hard_deletes=True ) }} select * from {{ source('raw_postgres', 'customers') }} {% endsnapshot %} ``` ### Documentation **Model Documentation:** ```yaml # models/marts/schema.yml version: 2 models: - name: fct_orders description: | # Order Transactions Fact Table This table contains one row per order with associated metrics and dimensions. ## Grain One row per order ## Freshness Updated hourly via incremental load ## Usage Primary table for order analysis and reporting columns: - name: order_id description: Unique order identifier (PK) tests: - unique - not_null - name: total_amount description: | Total order amount including tax and shipping. Formula: `subtotal + tax_amount + shipping_amount` - name: customer_segment description: Customer value segment meta: dimension: type: category label: Customer Segment ``` **Custom Documentation:** ```markdown {% docs __overview__ %} # Analytics dbt Project This dbt project transforms raw data from our production systems into analytics-ready models for BI and data science use cases. ## Data Sources - PostgreSQL (production database) - S3 (event tracking) - Snowflake (external data) ## Model Layers 1. **Staging**: Light transformations, renaming 2. **Intermediate**: Business logic, joins 3. **Marts**: Final tables for consumption {% enddocs %} ``` ### Packages and Dependencies **packages.yml:** ```yaml packages: - package: dbt-labs/dbt_utils version: 1.1.1 - package: calogica/dbt_expectations version: 0.10.0 - package: dbt-labs/codegen version: 0.12.1 - git: "https://github.com/dbt-labs/dbt-audit-helper.git" revision: 0.9.0 ``` **Using Packages:** ```sql -- Using dbt_utils select {{ dbt_utils.generate_surrogate_key(['user_id', 'order_id']) }} as order_key, {{ dbt_utils.safe_divide('revenue', 'orders') }} as avg_order_value, {{ dbt_utils.star(from=ref('stg_orders'), except=['_synced_at']) }} from {{ ref('stg_orders') }} -- Using dbt_expectations tests: - dbt_expectations.expect_column_values_to_be_between: min_value: 0 max_value: 100 ``` ## Best Practices ### 1. Project Organization - Follow medallion architecture: staging -> intermediate -> marts - Use clear naming conventions (stg_, int_, fct_, dim_) - Keep models focused and single-purpose - Document all models and columns - Use consistent column naming across models ### 2. Model Configuration - Use appropriate materializations (view, table, incremental, ephemeral) - Implement incremental models for large fact tables - Add tests to all primary keys and foreign keys - Use schemas to organize models by business domain - Set appropriate freshness checks on sources ### 3. Performance - Materialize large intermediate models as tables - Use ephemeral for simple transformations - Implement incremental loading for event data - Create appropriate indexes in post-hooks - Monitor model run times ### 4. Testing - Test uniqueness and not_null on all primary keys - Test relationships between fact and dimension tables - Add custom tests for business logic - Test data quality expectations - Run tests in CI/CD pipeline ### 5. Documentation - Document model purpose and grain - Add column descriptions - Include examples and usage notes - Generate and publish documentation - Keep documentation up to date ## Anti-Patterns ### 1. Complex CTEs ```sql -- Bad: Many nested CTEs with cte1 as (...), cte2 as (...), cte3 as (...) -- 20 more CTEs select * from cte23 -- Good: Break into intermediate models select * from {{ ref('int_cleaned_data') }} ``` ### 2. Not Using refs ```sql -- Bad: Direct table reference select * from analytics.staging.stg_orders -- Good: Use ref select * from {{ ref('stg_orders') }} ``` ### 3. No Tests ```sql -- Bad: No tests -- Good: Always test PKs and FKs columns: - name: id tests: [unique, not_null] ``` ### 4. Hardcoded Values ```sql -- Bad: Hardcoded date where created_at >= '2024-01-01' -- Good: Use variables where created_at >= '{{ var("start_date") }}' ``` ## Resources - [dbt Documentation](https://docs.getdbt.com/) - [dbt Best Practices](https://docs.getdbt.com/guides/best-practices) - [dbt Discourse Community](https://discourse.getdbt.com/) - [dbt Package Hub](https://hub.getdbt.com/) - [dbt Learn](https://learn.getdbt.com/) - [Analytics Engineering Guide](https://www.getdbt.com/analytics-engineering/) - [dbt Style Guide](https://github.com/dbt-labs/corp/blob/main/dbt_style_guide.md)