Event-Driven Architecture
Event-driven architecture (EDA) is a design paradigm where system components communicate through events — immutable records of things that have happened. Instead of services directly calling each other (request/response), they publish events to a central broker (like Kafka), and interested services subscribe to those events. This decoupling is the foundation of scalable, resilient, and evolvable data systems.
In data engineering, event-driven patterns are essential for building real-time data pipelines, maintaining data consistency across systems, and enabling the replay of historical data for debugging and reprocessing. Understanding these patterns separates junior data engineers from senior practitioners who can design robust, production-grade systems.
Event Sourcing
Event sourcing stores the state of an entity as a sequence of events rather than storing only the current state. Instead of updating a row in a database, you append an event describing what happened. The current state is derived by replaying events from the beginning. This provides a complete audit trail, the ability to reconstruct state at any point in time, and natural integration with event streaming platforms.
import json
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class Event:
"""Immutable event record."""
event_id: str
event_type: str
aggregate_id: str
data: dict
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
@dataclass
class BankAccount:
"""Account state rebuilt from events."""
account_id: str
balance: float = 0.0
status: str = "active"
owner: Optional[str] = None
def apply(self, event: Event):
"""Apply an event to update state."""
if event.event_type == "AccountOpened":
self.owner = event.data["owner"]
self.balance = event.data.get("initial_deposit", 0.0)
elif event.event_type == "MoneyDeposited":
self.balance += event.data["amount"]
elif event.event_type == "MoneyWithdrawn":
self.balance -= event.data["amount"]
elif event.event_type == "AccountClosed":
self.status = "closed"
# Event store: append-only log (in production, this is a Kafka topic)
event_store: list[Event] = []
def emit_event(event: Event):
"""Publish event to the event store."""
event_store.append(event)
# In production: producer.produce("bank-accounts", key=event.aggregate_id, ...)
# Create account through events
emit_event(Event("e1", "AccountOpened", "acct-001",
{"owner": "Alice", "initial_deposit": 1000.0}))
emit_event(Event("e2", "MoneyDeposited", "acct-001", {"amount": 500.0}))
emit_event(Event("e3", "MoneyWithdrawn", "acct-001", {"amount": 200.0}))
# Rebuild current state by replaying events
account = BankAccount("acct-001")
for event in event_store:
if event.aggregate_id == "acct-001":
account.apply(event)
print(f"Balance: {account.balance}") # 1300.0
# Time travel: rebuild state at a specific point
account_at_e2 = BankAccount("acct-001")
for event in event_store[:2]: # Only first 2 events
if event.aggregate_id == "acct-001":
account_at_e2.apply(event)
print(f"Balance after deposit: {account_at_e2.balance}") # 1500.0
CQRS: Command Query Responsibility Segregation
CQRS separates the write model (commands that change state) from the read model (queries that read state). In data engineering, this pattern naturally emerges when you use a stream of events as the write model and materialize different read-optimized views for different consumers. The write side publishes events, and multiple read sides build their own views of the data.
CQRS Benefits for Data Engineering
- Optimized Read Models: Each consumer builds views optimized for its specific query patterns — the BI team gets star schemas, the search team gets denormalized documents, the ML team gets feature vectors
- Independent Scaling: Reads and writes scale independently. Write throughput is determined by Kafka; read performance depends on each consumer's database.
- Eventual Consistency: Read models are updated asynchronously. This trade-off enables much higher throughput but requires consumers to handle stale data gracefully.
- Replay and Rebuild: If a read model has a bug, you can fix the consumer code and replay events from Kafka to rebuild the entire view from scratch.
# CQRS: Multiple read models from a single event stream
from confluent_kafka import Consumer
import json
# Read Model 1: Analytics database (aggregated data for dashboards)
def update_analytics_db(event):
"""Materialize events into analytics-optimized tables."""
if event["type"] == "OrderPlaced":
# INSERT INTO analytics.fact_orders ...
print(f"Analytics: recorded order {event['order_id']}")
elif event["type"] == "OrderShipped":
# UPDATE analytics.fact_orders SET shipped_at = ...
print(f"Analytics: updated shipment for {event['order_id']}")
# Read Model 2: Search index (denormalized documents for Elasticsearch)
def update_search_index(event):
"""Update search index with latest order data."""
if event["type"] == "OrderPlaced":
doc = {
"order_id": event["order_id"],
"customer_name": event["customer_name"],
"products": event["products"],
"status": "placed",
}
# es.index(index="orders", id=event["order_id"], body=doc)
print(f"Search: indexed order {event['order_id']}")
# Read Model 3: Redis cache (real-time metrics)
def update_realtime_cache(event):
"""Update real-time counters in Redis."""
if event["type"] == "OrderPlaced":
# redis.incr("orders:today:count")
# redis.incrbyfloat("orders:today:revenue", event["amount"])
print(f"Cache: incremented today's order count")
# Each read model runs as a separate consumer group
# They all consume the same "orders" topic independently
# Consumer Group: "analytics-consumer" -> update_analytics_db
# Consumer Group: "search-consumer" -> update_search_index
# Consumer Group: "cache-consumer" -> update_realtime_cache
Change Data Capture (CDC)
Change Data Capture captures row-level changes (inserts, updates, deletes) from a database and streams them as events. This enables real-time synchronization between operational databases and analytical systems without modifying application code. CDC is typically implemented by reading the database's transaction log (WAL in PostgreSQL, binlog in MySQL).
# Debezium CDC Connector for PostgreSQL
# Captures changes from PostgreSQL WAL and streams to Kafka
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: postgres-cdc-source
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
database.hostname: postgres
database.port: 5432
database.user: replication_user
database.password: secret
database.dbname: app_db
database.server.name: app
plugin.name: pgoutput
slot.name: debezium_slot
# Table filtering
table.include.list: public.orders,public.customers,public.products
# Output format
key.converter: io.confluent.connect.avro.AvroConverter
value.converter: io.confluent.connect.avro.AvroConverter
# Snapshot mode: initial snapshot then streaming
snapshot.mode: initial
# Heartbeat to keep replication slot active
heartbeat.interval.ms: 10000
Debezium produces events like this for each database change:
# Example Debezium CDC event for an INSERT
cdc_event = {
"before": None, # null for inserts
"after": {
"order_id": 1001,
"customer_id": 501,
"amount": 29.99,
"status": "pending",
"created_at": 1710500000000
},
"source": {
"version": "2.5.0",
"connector": "postgresql",
"name": "app",
"db": "app_db",
"schema": "public",
"table": "orders",
"txId": 12345,
"lsn": 987654321
},
"op": "c", # c=create, u=update, d=delete, r=read (snapshot)
"ts_ms": 1710500000123
}
# Processing CDC events
def process_cdc_event(event):
op = event["op"]
table = event["source"]["table"]
if op == "c": # INSERT
print(f"New row in {table}: {event['after']}")
# Sync to warehouse: INSERT INTO raw.{table} ...
elif op == "u": # UPDATE
print(f"Updated row in {table}: {event['before']} -> {event['after']}")
# Sync to warehouse: UPDATE or merge
elif op == "d": # DELETE
print(f"Deleted row in {table}: {event['before']}")
# Soft delete in warehouse: UPDATE SET deleted_at = ...
The Outbox Pattern
The outbox pattern solves the problem of reliably publishing events when you also need to update a database. Instead of publishing directly to Kafka (which can fail independently of the database transaction), you write the event to an "outbox" table within the same database transaction. A separate process (or CDC) reads the outbox table and publishes events to Kafka, ensuring consistency between database state and published events.
-- Outbox Pattern: Write events to a table within the same transaction
BEGIN;
-- Update the business table
INSERT INTO orders (order_id, customer_id, amount, status)
VALUES (1001, 501, 29.99, 'confirmed');
-- Write the event to the outbox table (same transaction!)
INSERT INTO outbox (
aggregate_type, aggregate_id,
event_type, payload, created_at
) VALUES (
'Order', '1001',
'OrderConfirmed',
'{"order_id": 1001, "customer_id": 501, "amount": 29.99}',
NOW()
);
COMMIT;
-- A CDC connector (Debezium) reads the outbox table
-- and publishes events to Kafka with guaranteed delivery
-- The outbox table can be periodically cleaned up
Event Streaming Patterns Summary
| Pattern | What It Solves | When to Use |
|---|---|---|
| Event Sourcing | Complete audit trail, time travel | Financial systems, compliance-heavy domains |
| CQRS | Optimized reads for different consumers | Multiple teams need different views of the same data |
| CDC | Real-time database replication | Syncing OLTP to OLAP without application changes |
| Outbox | Reliable event publishing with DB consistency | Microservices that need atomic DB + event operations |
| Saga | Distributed transactions across services | Multi-step workflows spanning multiple microservices |
Key Takeaways
- Event sourcing stores state as a sequence of immutable events, enabling audit trails and time travel
- CQRS separates write and read models, allowing each consumer to build optimized views from the same event stream
- CDC captures database changes via transaction logs and streams them to Kafka without modifying application code
- The outbox pattern ensures atomic consistency between database updates and event publication
- These patterns are complementary — CDC feeds the event stream, CQRS builds read models, event sourcing provides the audit trail
- Start with CDC for data integration; add event sourcing and CQRS when your domain complexity justifies the investment