TechLead
Lesson 15 of 22
6 min read
Data Engineering

dbt Models, Tests, and CI/CD

Build production-grade dbt projects with incremental models, custom tests, macros, packages, and CI/CD pipelines

Incremental Models

Incremental models are dbt's strategy for efficiently processing large tables. Instead of rebuilding the entire table on every run, incremental models only process new or updated rows. This can reduce run times from hours to minutes for tables with billions of rows. Incremental models use a is_incremental() flag to conditionally filter source data.

-- models/marts/fct_page_views.sql
-- Incremental model: only process new events

{{
    config(
        materialized='incremental',
        unique_key='event_id',
        incremental_strategy='merge',
        on_schema_change='append_new_columns',
        partition_by={
            'field': 'event_date',
            'data_type': 'date',
            'granularity': 'day'
        }
    )
}}

WITH source AS (
    SELECT
        event_id,
        user_id,
        page_url,
        referrer_url,
        device_type,
        browser,
        country,
        CAST(event_timestamp AS DATE) AS event_date,
        event_timestamp,
        session_id,
        duration_seconds
    FROM {{ ref('stg_page_views') }}

    {% if is_incremental() %}
    -- Only process events newer than the latest in the target table
    WHERE event_timestamp > (
        SELECT MAX(event_timestamp) FROM {{ this }}
    )
    {% endif %}
)

SELECT * FROM source

dbt Tests

dbt provides two types of tests: generic tests (declared in YAML) and singular tests (custom SQL queries). Tests run after models and validate that data meets quality expectations. Failed tests can block deployment in CI/CD pipelines.

# models/marts/_marts_models.yml
version: 2

models:
  - name: fct_orders
    description: "Fact table containing all completed orders with enriched dimensions"
    columns:
      - name: order_id
        description: "Unique order identifier"
        data_tests:
          - unique
          - not_null

      - name: customer_id
        description: "Foreign key to dim_customer"
        data_tests:
          - not_null
          - relationships:
              to: ref('dim_customer')
              field: customer_id

      - name: total_amount
        description: "Total order amount in USD"
        data_tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 100000

      - name: order_date
        description: "Date the order was placed"
        data_tests:
          - not_null
          - dbt_utils.not_null_proportion:
              at_least: 0.99

      - name: status
        description: "Order status"
        data_tests:
          - accepted_values:
              values: ['CONFIRMED', 'SHIPPED', 'DELIVERED', 'RETURNED']

  - name: dim_customer
    description: "Customer dimension with current attributes"
    columns:
      - name: customer_id
        data_tests:
          - unique
          - not_null
      - name: email
        data_tests:
          - unique
          - not_null

Singular Tests

-- tests/assert_orders_revenue_positive.sql
-- This test fails if any rows are returned

SELECT
    order_id,
    total_amount
FROM {{ ref('fct_orders') }}
WHERE total_amount < 0

-- tests/assert_no_orphaned_orders.sql
-- Orders must have a matching customer
SELECT
    o.order_id,
    o.customer_id
FROM {{ ref('fct_orders') }} o
LEFT JOIN {{ ref('dim_customer') }} c
    ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL

-- tests/assert_daily_revenue_reasonable.sql
-- Daily revenue should not drop more than 80% vs previous day
WITH daily AS (
    SELECT
        order_date,
        SUM(total_amount) AS revenue
    FROM {{ ref('fct_orders') }}
    GROUP BY order_date
),
with_prev AS (
    SELECT
        order_date,
        revenue,
        LAG(revenue) OVER (ORDER BY order_date) AS prev_revenue
    FROM daily
)
SELECT *
FROM with_prev
WHERE prev_revenue IS NOT NULL
  AND revenue < prev_revenue * 0.2  -- More than 80% drop

dbt Macros

Macros are reusable Jinja functions that generate SQL. They eliminate duplication across models and let you create powerful abstractions:

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
    ROUND({{ column_name }} / 100.0, 2)
{% endmacro %}

-- Usage in a model:
-- SELECT {{ cents_to_dollars('price_cents') }} AS price_usd

-- macros/generate_date_spine.sql
{% macro generate_date_spine(start_date, end_date) %}
    WITH date_spine AS (
        SELECT
            DATEADD(day, seq, '{{ start_date }}'::date) AS date_day
        FROM (
            SELECT ROW_NUMBER() OVER (ORDER BY NULL) - 1 AS seq
            FROM TABLE(GENERATOR(ROWCOUNT => 10000))
        )
        WHERE date_day <= '{{ end_date }}'::date
    )
    SELECT * FROM date_spine
{% endmacro %}

-- macros/safe_divide.sql
{% macro safe_divide(numerator, denominator, default=0) %}
    CASE
        WHEN {{ denominator }} = 0 OR {{ denominator }} IS NULL THEN {{ default }}
        ELSE {{ numerator }}::FLOAT / {{ denominator }}
    END
{% endmacro %}

dbt Packages

# packages.yml — Install community packages
packages:
  - package: dbt-labs/dbt_utils
    version: [">=1.0.0", "<2.0.0"]

  - package: dbt-labs/codegen
    version: [">=0.12.0", "<1.0.0"]

  - package: calogica/dbt_expectations
    version: [">=0.10.0", "<1.0.0"]

  - package: dbt-labs/audit_helper
    version: [">=0.10.0", "<1.0.0"]

# Install with: dbt deps

Sources and Freshness

# models/staging/_sources.yml
version: 2

sources:
  - name: app_database
    description: "Main application PostgreSQL database (synced by Fivetran)"
    database: raw
    schema: app_public
    loader: fivetran

    freshness:
      warn_after: {count: 6, period: hour}
      error_after: {count: 24, period: hour}
    loaded_at_field: _fivetran_synced

    tables:
      - name: orders
        description: "Customer orders from the e-commerce app"
        columns:
          - name: id
            description: "Primary key"
            data_tests:
              - unique
              - not_null

      - name: customers
        description: "Customer profiles"
        columns:
          - name: id
            data_tests:
              - unique
              - not_null

# Check freshness: dbt source freshness

CI/CD Pipeline for dbt

# .github/workflows/dbt-ci.yml
name: dbt CI
on:
  pull_request:
    branches: [main]
    paths:
      - 'models/**'
      - 'tests/**'
      - 'macros/**'

jobs:
  dbt-ci:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dbt
        run: pip install dbt-snowflake

      - name: Install packages
        run: dbt deps

      - name: Run changed models
        run: dbt run --select state:modified+ --defer --state prod-manifest/
        env:
          DBT_USER: ${{ secrets.DBT_CI_USER }}
          DBT_PASSWORD: ${{ secrets.DBT_CI_PASSWORD }}

      - name: Test changed models
        run: dbt test --select state:modified+ --defer --state prod-manifest/

      - name: Check source freshness
        run: dbt source freshness

Exposures: Documenting Downstream Consumers

Exposures let you document what dashboards, ML models, or applications consume your dbt models. This creates a complete lineage graph from raw sources through transformations to final consumers, enabling impact analysis when you need to change a model.

# models/exposures.yml
version: 2

exposures:
  - name: weekly_revenue_dashboard
    description: "Executive dashboard showing weekly revenue by region"
    type: dashboard
    maturity: high
    url: https://looker.company.com/dashboards/42
    owner:
      name: Analytics Team
      email: analytics@company.com
    depends_on:
      - ref('fct_orders')
      - ref('dim_customer')
      - ref('dim_product')

  - name: churn_prediction_model
    description: "ML model predicting customer churn probability"
    type: ml
    maturity: medium
    owner:
      name: Data Science Team
      email: data-science@company.com
    depends_on:
      - ref('fct_orders')
      - ref('dim_customer')

Incremental Model Strategies

dbt supports several incremental strategies that determine how new data is merged into existing tables:

Incremental Strategies

  • Append: Simply INSERT new rows. Fastest but allows duplicates. Good for event streams where duplicates are acceptable or handled downstream.
  • Merge: UPSERT based on unique_key — insert new rows and update existing ones. Most common strategy for dimension and fact tables.
  • Delete+Insert: Delete matching rows then insert. Useful when MERGE is not available or when you need to handle hard deletes from source.
  • Insert Overwrite: Replace entire partitions. Fast for partitioned tables where you want to reprocess specific date ranges.
-- Delete+Insert incremental strategy
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        incremental_strategy='delete+insert',
        partition_by={
            'field': 'order_date',
            'data_type': 'date',
            'granularity': 'day'
        }
    )
}}

SELECT
    order_id,
    customer_id,
    total_amount,
    order_date,
    status,
    updated_at
FROM {{ ref('stg_orders') }}

{% if is_incremental() %}
WHERE order_date >= (SELECT MAX(order_date) - INTERVAL '3 days' FROM {{ this }})
{% endif %}

-- The 3-day lookback handles late-arriving data and updates

Testing Best Practices

A comprehensive testing strategy catches issues at different levels. Combine schema tests (YAML), singular tests (SQL), and unit tests to create defense in depth. Run tests as part of CI/CD to prevent bad changes from reaching production.

  • Every primary key should have unique and not_null tests — this catches duplicate records immediately
  • Every foreign key should have a relationships test — ensures referential integrity between fact and dimension tables
  • Use accepted_values for status/category columns — catches unexpected enum values from source systems
  • Add custom singular tests for business rules — "revenue should never be negative", "order date should not be in the future"
  • Use dbt_expectations for distribution tests — catch anomalies in data volume, null rates, and value distributions

Key Takeaways

  • Incremental models process only new/changed rows, reducing run times for large tables from hours to minutes
  • Generic tests (YAML) cover common validations; singular tests (SQL) handle custom business logic assertions
  • Macros eliminate SQL duplication and create reusable abstractions across your project
  • Source freshness monitoring ensures upstream data is arriving on time before transformations run
  • CI/CD pipelines validate dbt changes on every pull request — run and test modified models before merging
  • Use dbt packages (dbt_utils, dbt_expectations) to leverage community-built macros and tests
  • Exposures document downstream consumers, completing the lineage from raw data to dashboards and ML models

Continue Learning