TechLead
Lesson 18 of 22
6 min read
Data Engineering

Data Quality and Testing

Implement comprehensive data quality frameworks with automated testing, anomaly detection, and data observability

Why Data Quality Matters

Data quality is the degree to which data is accurate, complete, consistent, timely, and fit for its intended purpose. Poor data quality is the number one complaint from data consumers — when dashboards show wrong numbers, ML models produce bad predictions, or reports contradict each other, trust in the data platform evaporates. Rebuilding that trust takes far longer than building quality into the system from the start.

Data quality is not a one-time activity but an ongoing discipline. Data engineers must build quality checks into every pipeline, monitor data continuously, and establish clear contracts with data producers and consumers. The cost of catching a data quality issue increases exponentially the further downstream it travels — catching a problem at ingestion costs minutes; catching it after a CEO makes a decision based on wrong data costs credibility and revenue.

Dimensions of Data Quality

  • Accuracy: Does the data correctly represent real-world values? Is the revenue figure actually what was charged?
  • Completeness: Are all expected records and fields present? Are there unexpected NULLs or missing rows?
  • Consistency: Does the same metric produce the same result across different tables and dashboards?
  • Timeliness/Freshness: Is the data up-to-date? When was it last refreshed?
  • Uniqueness: Are there duplicate records that could inflate metrics?
  • Validity: Does the data conform to expected formats, ranges, and business rules?

Testing with Great Expectations

Great Expectations is the most popular open-source data quality framework. It lets you define "expectations" (assertions) about your data and validate them automatically in your pipeline.

import great_expectations as gx

# Create a data context
context = gx.get_context()

# Connect to your data
datasource = context.sources.add_pandas("orders_source")
data_asset = datasource.add_dataframe_asset("orders")

# Build an expectation suite
suite = context.add_expectation_suite("orders_quality_suite")

# Define expectations
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount", min_value=0, max_value=100000
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeInSet(
        column="status",
        value_set=["pending", "confirmed", "shipped", "delivered", "cancelled"]
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="order_date",
        min_value="2020-01-01",
        max_value="2026-12-31"
    )
)
suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=1000,   # At least 1000 orders per batch
        max_value=1000000
    )
)

# Validate
batch_request = data_asset.build_batch_request(dataframe=orders_df)
results = context.run_checkpoint(
    checkpoint_name="orders_checkpoint",
    batch_request=batch_request,
    expectation_suite_name="orders_quality_suite",
)

if not results.success:
    failed = [r for r in results.results if not r.success]
    for f in failed:
        print(f"FAILED: {f.expectation_config.expectation_type}")
    raise DataQualityError("Data quality checks failed")

SQL-Based Data Quality Tests

-- Comprehensive data quality checks as SQL queries
-- Each query returns rows that VIOLATE the rule (expect zero rows)

-- 1. Freshness: Data should be updated within the last 2 hours
SELECT 'stale_data' AS check_name,
    MAX(loaded_at) AS last_load,
    CURRENT_TIMESTAMP AS checked_at,
    DATEDIFF('hour', MAX(loaded_at), CURRENT_TIMESTAMP) AS hours_stale
FROM analytics.fct_orders
HAVING DATEDIFF('hour', MAX(loaded_at), CURRENT_TIMESTAMP) > 2;

-- 2. Volume: Today's row count should be within 50% of yesterday's
WITH daily_counts AS (
    SELECT
        order_date,
        COUNT(*) AS row_count,
        LAG(COUNT(*)) OVER (ORDER BY order_date) AS prev_count
    FROM analytics.fct_orders
    GROUP BY order_date
)
SELECT 'volume_anomaly' AS check_name, *
FROM daily_counts
WHERE order_date = CURRENT_DATE
  AND row_count < prev_count * 0.5;

-- 3. Null rate: Critical columns should have < 1% nulls
SELECT
    'high_null_rate' AS check_name,
    COUNT(*) AS total_rows,
    SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) AS null_customer_ids,
    ROUND(SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END)::FLOAT
          / COUNT(*) * 100, 2) AS null_pct
FROM analytics.fct_orders
HAVING null_pct > 1.0;

-- 4. Referential integrity: All foreign keys should have matching dimension rows
SELECT 'orphaned_orders' AS check_name,
    COUNT(*) AS orphan_count
FROM analytics.fct_orders f
LEFT JOIN analytics.dim_customer c ON f.customer_id = c.customer_id
WHERE c.customer_id IS NULL
HAVING COUNT(*) > 0;

-- 5. Business rule: Order amounts should match line item totals
SELECT 'amount_mismatch' AS check_name,
    o.order_id,
    o.total_amount AS order_total,
    SUM(li.quantity * li.unit_price) AS calculated_total,
    ABS(o.total_amount - SUM(li.quantity * li.unit_price)) AS difference
FROM analytics.fct_orders o
JOIN analytics.fct_order_items li ON o.order_id = li.order_id
GROUP BY o.order_id, o.total_amount
HAVING ABS(o.total_amount - SUM(li.quantity * li.unit_price)) > 0.01;

Data Observability

Data observability extends monitoring to provide deep visibility into the health of your data pipelines and datasets. It answers questions like: Is my data fresh? Has the schema changed? Are there anomalies in volume or distribution? Popular tools include Monte Carlo, Soda, and Elementary.

# soda_checks.yml — Soda Core data quality checks
checks for analytics.fct_orders:
  # Freshness
  - freshness(loaded_at) < 2h

  # Row count
  - row_count > 0
  - row_count_change < 50%

  # Schema
  - schema:
      fail:
        when required column missing:
          - order_id
          - customer_id
          - total_amount
          - order_date

  # Null checks
  - missing_count(order_id) = 0
  - missing_count(customer_id) = 0
  - missing_percent(total_amount) < 1%

  # Validity
  - invalid_count(status) = 0:
      valid values: ['pending', 'confirmed', 'shipped', 'delivered']
  - min(total_amount) >= 0
  - max(total_amount) < 100000

  # Duplicates
  - duplicate_count(order_id) = 0

  # Distribution (anomaly detection)
  - anomaly detection for row_count
  - anomaly detection for avg(total_amount)

Building a Data Quality Framework

Implementation Strategy

  • Layer 1 — Ingestion: Validate schema, data types, and basic constraints when data enters the platform. Reject or quarantine invalid records immediately.
  • Layer 2 — Transformation: Run dbt tests after each transformation step. Check uniqueness, not-null, referential integrity, and accepted values.
  • Layer 3 — Serving: Monitor freshness, volume trends, and distribution anomalies on final tables. Alert data consumers about known issues.
  • Layer 4 — Consumer: Provide data contracts and SLAs. Document expected freshness, completeness, and accuracy for each dataset.

Data Contracts

# data_contracts/fct_orders.yml
# Formal agreement between producers and consumers
contract:
  name: fct_orders
  owner: data-engineering-team
  description: "Fact table for all completed customer orders"

  sla:
    freshness: 2 hours          # Data no more than 2 hours old
    availability: 99.9%         # Uptime guarantee
    completeness: 99%           # At least 99% of expected rows present

  schema:
    - name: order_id
      type: bigint
      constraints: [not_null, unique]
    - name: customer_id
      type: bigint
      constraints: [not_null]
    - name: total_amount
      type: decimal(12,2)
      constraints: [not_null, positive]
    - name: order_date
      type: date
      constraints: [not_null, not_future]

  consumers:
    - team: analytics
      use_case: "Revenue dashboards"
    - team: data-science
      use_case: "Churn prediction model"

Automated Anomaly Detection

Manual threshold-based checks catch known issues, but anomaly detection catches unknown unknowns — unexpected changes in data patterns that you did not anticipate. Statistical methods can automatically detect when today's data looks significantly different from historical patterns.

import numpy as np
from datetime import datetime, timedelta

def detect_volume_anomaly(
    current_count: int,
    historical_counts: list[int],
    z_threshold: float = 3.0
) -> dict:
    """Detect anomalies in row counts using z-score method."""
    if len(historical_counts) < 7:
        return {"is_anomaly": False, "reason": "Not enough history"}

    mean = np.mean(historical_counts)
    std = np.std(historical_counts)

    if std == 0:
        return {"is_anomaly": current_count != mean,
                "reason": "Zero variance — count changed"}

    z_score = (current_count - mean) / std

    return {
        "is_anomaly": abs(z_score) > z_threshold,
        "z_score": round(z_score, 2),
        "current": current_count,
        "expected_range": (
            int(mean - z_threshold * std),
            int(mean + z_threshold * std)
        ),
        "reason": (
            f"Count {current_count} is {abs(z_score):.1f} standard deviations "
            f"from the mean of {mean:.0f}"
        ) if abs(z_score) > z_threshold else "Within normal range"
    }

# Example usage in a pipeline
historical = [10200, 10150, 10300, 10180, 10250, 10190, 10220]
today = 5100  # Suspicious drop!

result = detect_volume_anomaly(today, historical)
if result["is_anomaly"]:
    send_alert(f"Volume anomaly detected: {result['reason']}")

Incident Response for Data Quality

When a data quality issue is detected, having a structured incident response process prevents panic and ensures issues are resolved quickly and transparently. Every data team should have a documented runbook:

  • Detect: Automated monitoring or user report identifies the issue
  • Triage: Assess severity — is this a broken dashboard for executives, or a minor issue in a low-priority table?
  • Communicate: Notify affected consumers immediately. "We are aware of incorrect revenue numbers on the dashboard and are investigating."
  • Diagnose: Use lineage to trace the issue to its source. Was it a source system change? A pipeline bug? A data quality check gap?
  • Fix: Repair the pipeline, backfill affected data, and verify the fix with quality checks
  • Postmortem: Document the root cause, impact, timeline, and what new checks to add to prevent recurrence

Key Takeaways

  • Data quality has six dimensions: accuracy, completeness, consistency, timeliness, uniqueness, and validity
  • Build quality checks into every layer: ingestion, transformation, and serving
  • Use tools like Great Expectations, Soda, or dbt tests to automate quality validation
  • Monitor freshness, volume, schema, and distribution continuously — detect anomalies before consumers notice
  • Data contracts formalize expectations between producers and consumers — SLAs, schema, and quality guarantees
  • The cost of a quality issue increases exponentially the further downstream it travels — catch problems early

Continue Learning