What is a Data Pipeline?
A data pipeline is an automated workflow that moves data from one or more sources through a series of processing steps to a destination. Pipelines are the primary artifact that data engineers build and maintain. A well-designed pipeline is reliable (handles failures gracefully), idempotent (can be safely re-run), observable (you know when something goes wrong), and scalable (handles growing data volumes without redesign).
Pipeline design is both an art and an engineering discipline. Poor pipeline design leads to brittle systems that break constantly, produce incorrect data, and are impossible to debug. Good pipeline design produces trustworthy data that stakeholders can rely on for critical business decisions. The patterns in this lesson are distilled from years of production experience and apply regardless of the specific tools you use.
Pipeline Design Principles
Core Principles
- Idempotency: Running a pipeline multiple times with the same input produces the same output. This is the most important principle — it enables safe retries and backfills.
- Atomicity: Each pipeline step either completes fully or not at all. Partial writes lead to data corruption and inconsistent results.
- Incremental Processing: Process only new or changed data rather than reprocessing everything. This reduces cost and latency.
- Schema Evolution: Handle changes in source data schemas gracefully without pipeline failures.
- Observability: Every pipeline should emit metrics, logs, and alerts. You should know about failures before your stakeholders do.
- Self-Documenting: Pipeline code should be readable and include clear comments about business logic and data contracts.
Idempotent Pipeline Design
An idempotent pipeline produces the same result whether it runs once or ten times with the same input. This is critical because pipelines fail and must be retried. If a pipeline is not idempotent, a retry can produce duplicate data, corrupt existing records, or cause downstream inconsistencies.
# BAD: Non-idempotent — appending causes duplicates on retry
def bad_pipeline(date: str):
data = extract_daily_orders(date)
transformed = transform_orders(data)
# DANGER: If this runs twice, orders are duplicated
db.execute(f"INSERT INTO analytics.orders SELECT * FROM staging")
# GOOD: Idempotent — delete-then-insert pattern
def good_pipeline(date: str):
data = extract_daily_orders(date)
transformed = transform_orders(data)
# Atomic swap: delete existing data for this date, then insert
db.execute(f"""
BEGIN;
DELETE FROM analytics.daily_orders WHERE order_date = '{date}';
INSERT INTO analytics.daily_orders
SELECT * FROM staging.orders WHERE order_date = '{date}';
COMMIT;
""")
# BETTER: Idempotent with partition overwrite
def best_pipeline(date: str):
data = extract_daily_orders(date)
transformed = transform_orders(data)
# Overwrite the specific partition — idempotent by design
transformed.write .mode("overwrite") .partitionBy("order_date") .parquet(f"s3://data-lake/gold/daily_orders/")
Incremental Processing Patterns
Incremental processing only handles new or changed data since the last successful run. This is essential for performance as data volumes grow. There are several strategies for tracking what has been processed:
from datetime import datetime, timedelta
# Pattern 1: Timestamp-based incremental (most common)
def incremental_by_timestamp(last_run: datetime):
"""Extract only records modified since last successful run."""
query = f"""
SELECT * FROM source.orders
WHERE updated_at > '{last_run.isoformat()}'
AND updated_at <= '{datetime.utcnow().isoformat()}'
"""
new_records = db.execute(query)
# Merge into target (UPSERT)
for record in new_records:
db.execute(f"""
INSERT INTO analytics.orders (order_id, customer_id, amount, updated_at)
VALUES ({record['order_id']}, {record['customer_id']},
{record['amount']}, '{record['updated_at']}')
ON CONFLICT (order_id) DO UPDATE SET
customer_id = EXCLUDED.customer_id,
amount = EXCLUDED.amount,
updated_at = EXCLUDED.updated_at
""")
# Pattern 2: Watermark-based for data lakes
def incremental_by_watermark():
"""Use a high-watermark table to track progress."""
# Read the last processed watermark
watermark = db.execute(
"SELECT MAX(watermark_value) FROM pipeline.watermarks "
"WHERE pipeline_name = 'orders_pipeline'"
)
# Process only new data
new_data = spark.read.parquet("s3://raw/orders/") .filter(col("_ingested_at") > watermark)
if new_data.count() > 0:
new_data.write.mode("append").parquet("s3://silver/orders/")
# Update watermark
new_watermark = new_data.agg({"_ingested_at": "max"}).collect()[0][0]
db.execute(f"""
UPDATE pipeline.watermarks
SET watermark_value = '{new_watermark}', updated_at = NOW()
WHERE pipeline_name = 'orders_pipeline'
""")
Error Handling and Retry Strategies
import time
import logging
from functools import wraps
logger = logging.getLogger(__name__)
def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
"""Decorator for retrying functions with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
logger.error(f"Failed after {max_retries} retries: {e}")
raise
delay = min(base_delay * (2 ** attempt), max_delay)
logger.warning(
f"Attempt {attempt + 1} failed: {e}. "
f"Retrying in {delay}s..."
)
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=3, base_delay=5)
def extract_from_api(endpoint: str) -> list[dict]:
"""Extract data with automatic retry on transient failures."""
response = requests.get(endpoint, timeout=30)
response.raise_for_status()
return response.json()["data"]
class PipelineResult:
"""Structured result from a pipeline run."""
def __init__(self, pipeline_name: str):
self.pipeline_name = pipeline_name
self.start_time = datetime.utcnow()
self.rows_extracted = 0
self.rows_loaded = 0
self.errors: list[str] = []
self.status = "running"
def finish(self, status="success"):
self.status = status
self.duration = (datetime.utcnow() - self.start_time).total_seconds()
logger.info(
f"Pipeline {self.pipeline_name}: {self.status} "
f"in {self.duration:.1f}s. "
f"Extracted: {self.rows_extracted}, Loaded: {self.rows_loaded}, "
f"Errors: {len(self.errors)}"
)
Data Validation in Pipelines
from dataclasses import dataclass
@dataclass
class DataQualityCheck:
name: str
query: str
expected: str # "zero_rows", "non_empty", "equal"
threshold: float = 0.0
def run_quality_checks(checks: list[DataQualityCheck], connection):
"""Run data quality checks and raise on failure."""
failures = []
for check in checks:
result = connection.execute(check.query).fetchone()[0]
if check.expected == "zero_rows" and result > 0:
failures.append(f"{check.name}: expected 0 rows, got {result}")
elif check.expected == "non_empty" and result == 0:
failures.append(f"{check.name}: table is empty")
if failures:
raise DataQualityError(f"Quality checks failed: {failures}")
# Define checks for the orders pipeline
order_checks = [
DataQualityCheck(
name="no_null_order_ids",
query="SELECT COUNT(*) FROM staging.orders WHERE order_id IS NULL",
expected="zero_rows",
),
DataQualityCheck(
name="no_negative_amounts",
query="SELECT COUNT(*) FROM staging.orders WHERE amount < 0",
expected="zero_rows",
),
DataQualityCheck(
name="orders_not_empty",
query="SELECT COUNT(*) FROM staging.orders",
expected="non_empty",
),
DataQualityCheck(
name="no_future_dates",
query="SELECT COUNT(*) FROM staging.orders WHERE order_date > CURRENT_DATE",
expected="zero_rows",
),
]
Pipeline Monitoring and Alerting
What to Monitor
- Freshness: How recently was the data updated? Alert if data is older than the expected SLA (e.g., more than 2 hours stale).
- Volume: How many rows were processed? Alert on unexpected drops (>50% fewer rows than yesterday) or spikes.
- Schema: Did the source schema change? Alert on new columns, removed columns, or type changes.
- Duration: How long did the pipeline take? Alert if it takes more than 2x the typical duration.
- Error Rate: What percentage of records failed validation? Alert if error rate exceeds threshold.
- Data Quality: Are null rates, uniqueness, and value distributions within expected ranges?
Common Pipeline Anti-Patterns
Avoid these common mistakes that lead to fragile, unreliable pipelines:
- Non-Idempotent Appends: Using INSERT without a delete or merge strategy. Retries create duplicate rows, inflating metrics and causing downstream confusion.
- Hardcoded Dates: Using
datetime.now()instead of pipeline-provided execution dates. This breaks backfills and makes pipelines non-deterministic. - Silent Failures: Catching exceptions without logging or alerting. The pipeline reports success while producing incomplete or corrupted data.
- Monolithic Pipelines: A single script that extracts, transforms, and loads everything. When it fails, you cannot tell which step broke or retry partially.
- No Schema Validation: Trusting that source data will always match expected formats. Schema changes in upstream systems silently corrupt your output.
- Missing Backfill Strategy: Not designing pipelines to reprocess historical data. When bugs are found, you cannot fix past data without manual intervention.
Pipeline Testing Strategies
import pytest
from unittest.mock import MagicMock
def test_transform_orders_removes_cancelled():
"""Unit test: cancelled orders should be filtered out."""
raw_orders = [
{"order_id": 1, "status": "confirmed", "amount": 29.99},
{"order_id": 2, "status": "cancelled", "amount": 0},
{"order_id": 3, "status": "shipped", "amount": 49.99},
]
result = transform_orders(raw_orders)
assert len(result) == 2
assert all(r["status"] != "cancelled" for r in result)
def test_transform_orders_calculates_total():
"""Unit test: total_amount should be quantity * unit_price."""
raw = [{"order_id": 1, "quantity": 3, "unit_price": 10.00, "status": "confirmed"}]
result = transform_orders(raw)
assert result[0]["total_amount"] == 30.00
def test_pipeline_is_idempotent(db_connection):
"""Integration test: running twice produces same result."""
run_pipeline("2025-03-15")
count_after_first = db_connection.execute(
"SELECT COUNT(*) FROM analytics.orders WHERE order_date = '2025-03-15'"
).fetchone()[0]
run_pipeline("2025-03-15") # Second run
count_after_second = db_connection.execute(
"SELECT COUNT(*) FROM analytics.orders WHERE order_date = '2025-03-15'"
).fetchone()[0]
assert count_after_first == count_after_second # No duplicates
Key Takeaways
- Idempotency is the most important pipeline design principle — design every step to be safely re-runnable
- Use incremental processing to handle growing data volumes efficiently
- Implement retry with exponential backoff for transient failures
- Build data validation checks into every pipeline to catch issues before they reach production tables
- Monitor freshness, volume, schema, and duration — alert before stakeholders notice problems
- Treat pipeline code like production software: version control, testing, code review, and documentation