TechLead
Lesson 8 of 22
7 min read
Data Engineering

Event Streaming Patterns

Master event sourcing, CQRS, CDC, and other essential patterns for building event-driven data architectures

Event-Driven Architecture

Event-driven architecture (EDA) is a design paradigm where system components communicate through events — immutable records of things that have happened. Instead of services directly calling each other (request/response), they publish events to a central broker (like Kafka), and interested services subscribe to those events. This decoupling is the foundation of scalable, resilient, and evolvable data systems.

In data engineering, event-driven patterns are essential for building real-time data pipelines, maintaining data consistency across systems, and enabling the replay of historical data for debugging and reprocessing. Understanding these patterns separates junior data engineers from senior practitioners who can design robust, production-grade systems.

Event Sourcing

Event sourcing stores the state of an entity as a sequence of events rather than storing only the current state. Instead of updating a row in a database, you append an event describing what happened. The current state is derived by replaying events from the beginning. This provides a complete audit trail, the ability to reconstruct state at any point in time, and natural integration with event streaming platforms.

import json
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class Event:
    """Immutable event record."""
    event_id: str
    event_type: str
    aggregate_id: str
    data: dict
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())

@dataclass
class BankAccount:
    """Account state rebuilt from events."""
    account_id: str
    balance: float = 0.0
    status: str = "active"
    owner: Optional[str] = None

    def apply(self, event: Event):
        """Apply an event to update state."""
        if event.event_type == "AccountOpened":
            self.owner = event.data["owner"]
            self.balance = event.data.get("initial_deposit", 0.0)
        elif event.event_type == "MoneyDeposited":
            self.balance += event.data["amount"]
        elif event.event_type == "MoneyWithdrawn":
            self.balance -= event.data["amount"]
        elif event.event_type == "AccountClosed":
            self.status = "closed"

# Event store: append-only log (in production, this is a Kafka topic)
event_store: list[Event] = []

def emit_event(event: Event):
    """Publish event to the event store."""
    event_store.append(event)
    # In production: producer.produce("bank-accounts", key=event.aggregate_id, ...)

# Create account through events
emit_event(Event("e1", "AccountOpened", "acct-001",
                 {"owner": "Alice", "initial_deposit": 1000.0}))
emit_event(Event("e2", "MoneyDeposited", "acct-001", {"amount": 500.0}))
emit_event(Event("e3", "MoneyWithdrawn", "acct-001", {"amount": 200.0}))

# Rebuild current state by replaying events
account = BankAccount("acct-001")
for event in event_store:
    if event.aggregate_id == "acct-001":
        account.apply(event)

print(f"Balance: {account.balance}")  # 1300.0

# Time travel: rebuild state at a specific point
account_at_e2 = BankAccount("acct-001")
for event in event_store[:2]:  # Only first 2 events
    if event.aggregate_id == "acct-001":
        account_at_e2.apply(event)

print(f"Balance after deposit: {account_at_e2.balance}")  # 1500.0

CQRS: Command Query Responsibility Segregation

CQRS separates the write model (commands that change state) from the read model (queries that read state). In data engineering, this pattern naturally emerges when you use a stream of events as the write model and materialize different read-optimized views for different consumers. The write side publishes events, and multiple read sides build their own views of the data.

CQRS Benefits for Data Engineering

  • Optimized Read Models: Each consumer builds views optimized for its specific query patterns — the BI team gets star schemas, the search team gets denormalized documents, the ML team gets feature vectors
  • Independent Scaling: Reads and writes scale independently. Write throughput is determined by Kafka; read performance depends on each consumer's database.
  • Eventual Consistency: Read models are updated asynchronously. This trade-off enables much higher throughput but requires consumers to handle stale data gracefully.
  • Replay and Rebuild: If a read model has a bug, you can fix the consumer code and replay events from Kafka to rebuild the entire view from scratch.
# CQRS: Multiple read models from a single event stream

from confluent_kafka import Consumer
import json

# Read Model 1: Analytics database (aggregated data for dashboards)
def update_analytics_db(event):
    """Materialize events into analytics-optimized tables."""
    if event["type"] == "OrderPlaced":
        # INSERT INTO analytics.fact_orders ...
        print(f"Analytics: recorded order {event['order_id']}")
    elif event["type"] == "OrderShipped":
        # UPDATE analytics.fact_orders SET shipped_at = ...
        print(f"Analytics: updated shipment for {event['order_id']}")

# Read Model 2: Search index (denormalized documents for Elasticsearch)
def update_search_index(event):
    """Update search index with latest order data."""
    if event["type"] == "OrderPlaced":
        doc = {
            "order_id": event["order_id"],
            "customer_name": event["customer_name"],
            "products": event["products"],
            "status": "placed",
        }
        # es.index(index="orders", id=event["order_id"], body=doc)
        print(f"Search: indexed order {event['order_id']}")

# Read Model 3: Redis cache (real-time metrics)
def update_realtime_cache(event):
    """Update real-time counters in Redis."""
    if event["type"] == "OrderPlaced":
        # redis.incr("orders:today:count")
        # redis.incrbyfloat("orders:today:revenue", event["amount"])
        print(f"Cache: incremented today's order count")

# Each read model runs as a separate consumer group
# They all consume the same "orders" topic independently
# Consumer Group: "analytics-consumer" -> update_analytics_db
# Consumer Group: "search-consumer" -> update_search_index
# Consumer Group: "cache-consumer" -> update_realtime_cache

Change Data Capture (CDC)

Change Data Capture captures row-level changes (inserts, updates, deletes) from a database and streams them as events. This enables real-time synchronization between operational databases and analytical systems without modifying application code. CDC is typically implemented by reading the database's transaction log (WAL in PostgreSQL, binlog in MySQL).

# Debezium CDC Connector for PostgreSQL
# Captures changes from PostgreSQL WAL and streams to Kafka
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: postgres-cdc-source
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  tasksMax: 1
  config:
    database.hostname: postgres
    database.port: 5432
    database.user: replication_user
    database.password: secret
    database.dbname: app_db
    database.server.name: app
    plugin.name: pgoutput
    slot.name: debezium_slot

    # Table filtering
    table.include.list: public.orders,public.customers,public.products

    # Output format
    key.converter: io.confluent.connect.avro.AvroConverter
    value.converter: io.confluent.connect.avro.AvroConverter

    # Snapshot mode: initial snapshot then streaming
    snapshot.mode: initial

    # Heartbeat to keep replication slot active
    heartbeat.interval.ms: 10000

Debezium produces events like this for each database change:

# Example Debezium CDC event for an INSERT
cdc_event = {
    "before": None,  # null for inserts
    "after": {
        "order_id": 1001,
        "customer_id": 501,
        "amount": 29.99,
        "status": "pending",
        "created_at": 1710500000000
    },
    "source": {
        "version": "2.5.0",
        "connector": "postgresql",
        "name": "app",
        "db": "app_db",
        "schema": "public",
        "table": "orders",
        "txId": 12345,
        "lsn": 987654321
    },
    "op": "c",  # c=create, u=update, d=delete, r=read (snapshot)
    "ts_ms": 1710500000123
}

# Processing CDC events
def process_cdc_event(event):
    op = event["op"]
    table = event["source"]["table"]

    if op == "c":  # INSERT
        print(f"New row in {table}: {event['after']}")
        # Sync to warehouse: INSERT INTO raw.{table} ...
    elif op == "u":  # UPDATE
        print(f"Updated row in {table}: {event['before']} -> {event['after']}")
        # Sync to warehouse: UPDATE or merge
    elif op == "d":  # DELETE
        print(f"Deleted row in {table}: {event['before']}")
        # Soft delete in warehouse: UPDATE SET deleted_at = ...

The Outbox Pattern

The outbox pattern solves the problem of reliably publishing events when you also need to update a database. Instead of publishing directly to Kafka (which can fail independently of the database transaction), you write the event to an "outbox" table within the same database transaction. A separate process (or CDC) reads the outbox table and publishes events to Kafka, ensuring consistency between database state and published events.

-- Outbox Pattern: Write events to a table within the same transaction

BEGIN;

-- Update the business table
INSERT INTO orders (order_id, customer_id, amount, status)
VALUES (1001, 501, 29.99, 'confirmed');

-- Write the event to the outbox table (same transaction!)
INSERT INTO outbox (
    aggregate_type, aggregate_id,
    event_type, payload, created_at
) VALUES (
    'Order', '1001',
    'OrderConfirmed',
    '{"order_id": 1001, "customer_id": 501, "amount": 29.99}',
    NOW()
);

COMMIT;

-- A CDC connector (Debezium) reads the outbox table
-- and publishes events to Kafka with guaranteed delivery
-- The outbox table can be periodically cleaned up

Event Streaming Patterns Summary

Pattern What It Solves When to Use
Event SourcingComplete audit trail, time travelFinancial systems, compliance-heavy domains
CQRSOptimized reads for different consumersMultiple teams need different views of the same data
CDCReal-time database replicationSyncing OLTP to OLAP without application changes
OutboxReliable event publishing with DB consistencyMicroservices that need atomic DB + event operations
SagaDistributed transactions across servicesMulti-step workflows spanning multiple microservices

Key Takeaways

  • Event sourcing stores state as a sequence of immutable events, enabling audit trails and time travel
  • CQRS separates write and read models, allowing each consumer to build optimized views from the same event stream
  • CDC captures database changes via transaction logs and streams them to Kafka without modifying application code
  • The outbox pattern ensures atomic consistency between database updates and event publication
  • These patterns are complementary — CDC feeds the event stream, CQRS builds read models, event sourcing provides the audit trail
  • Start with CDC for data integration; add event sourcing and CQRS when your domain complexity justifies the investment

Continue Learning