TechLead
Lesson 8 of 27
5 min read
Software Architecture

Event Sourcing

Learn event sourcing patterns including event stores, replaying events, snapshots, and projections

What is Event Sourcing?

Event Sourcing is a pattern where the state of an application is determined by a sequence of events. Instead of storing the current state, you store every state change as an immutable event. The current state is derived by replaying all events from the beginning. This gives you a complete audit trail and the ability to reconstruct any past state.

Event Sourcing vs Traditional Storage

  • Traditional: Store current state — UPDATE orders SET status = 'shipped' WHERE id = 1
  • Event Sourced: Store events — OrderCreated, ItemsAdded, PaymentReceived, OrderShipped
  • State reconstruction: Replay events to get current state: fold(initialState, events) = currentState
  • Temporal queries: Get the state at any point in time by replaying events up to that timestamp

Event Store Implementation

// Event store types
interface StoredEvent {
  eventId: string;
  streamId: string;
  eventType: string;
  data: Record;
  metadata: Record;
  version: number;
  timestamp: Date;
}

interface EventStore {
  append(streamId: string, events: DomainEvent[], expectedVersion: number): Promise;
  getStream(streamId: string, fromVersion?: number): Promise;
  getAll(fromPosition?: number, limit?: number): Promise;
}

// PostgreSQL event store
class PostgresEventStore implements EventStore {
  constructor(private readonly pool: Pool) {}

  async append(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise {
    const client = await this.pool.connect();
    try {
      await client.query("BEGIN");

      // Optimistic concurrency check
      const result = await client.query(
        "SELECT MAX(version) as max_version FROM events WHERE stream_id = $1",
        [streamId]
      );
      const currentVersion = result.rows[0]?.max_version ?? -1;

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but found ${currentVersion}`
        );
      }

      // Append events
      for (let i = 0; i < events.length; i++) {
        const event = events[i];
        const version = expectedVersion + i + 1;
        await client.query(
          `INSERT INTO events (event_id, stream_id, event_type, data, metadata, version, timestamp)
           VALUES ($1, $2, $3, $4, $5, $6, $7)`,
          [event.eventId, streamId, event.eventType, JSON.stringify(event.data),
           JSON.stringify(event.metadata), version, event.timestamp]
        );
      }

      await client.query("COMMIT");
    } catch (error) {
      await client.query("ROLLBACK");
      throw error;
    } finally {
      client.release();
    }
  }

  async getStream(streamId: string, fromVersion = 0): Promise {
    const result = await this.pool.query(
      "SELECT * FROM events WHERE stream_id = $1 AND version >= $2 ORDER BY version",
      [streamId, fromVersion]
    );
    return result.rows;
  }
}

Event-Sourced Aggregate

// Base class for event-sourced aggregates
abstract class EventSourcedAggregate {
  private _uncommittedEvents: DomainEvent[] = [];
  private _version = -1;

  get version(): number { return this._version; }
  get uncommittedEvents(): DomainEvent[] { return [...this._uncommittedEvents]; }

  protected apply(event: DomainEvent): void {
    this.when(event);
    this._uncommittedEvents.push(event);
  }

  // Rehydrate from stored events
  loadFromHistory(events: StoredEvent[]): void {
    for (const stored of events) {
      const event = this.deserialize(stored);
      this.when(event);
      this._version = stored.version;
    }
  }

  clearUncommittedEvents(): void {
    this._uncommittedEvents = [];
  }

  protected abstract when(event: DomainEvent): void;
  protected abstract deserialize(stored: StoredEvent): DomainEvent;
}

// Concrete aggregate: ShoppingCart
class ShoppingCart extends EventSourcedAggregate {
  private _customerId: string = "";
  private _items: Map = new Map();
  private _checkedOut = false;

  static create(cartId: string, customerId: string): ShoppingCart {
    const cart = new ShoppingCart();
    cart.apply({
      eventId: generateId(),
      eventType: "CartCreated",
      data: { cartId, customerId },
      metadata: {},
      timestamp: new Date(),
    });
    return cart;
  }

  addItem(productId: string, name: string, price: number, quantity: number): void {
    if (this._checkedOut) throw new Error("Cart is already checked out");
    this.apply({
      eventId: generateId(),
      eventType: "ItemAddedToCart",
      data: { productId, name, price, quantity },
      metadata: {},
      timestamp: new Date(),
    });
  }

  removeItem(productId: string): void {
    if (!this._items.has(productId)) throw new Error("Item not in cart");
    this.apply({
      eventId: generateId(),
      eventType: "ItemRemovedFromCart",
      data: { productId },
      metadata: {},
      timestamp: new Date(),
    });
  }

  checkout(): void {
    if (this._items.size === 0) throw new Error("Cannot checkout empty cart");
    if (this._checkedOut) throw new Error("Already checked out");
    this.apply({
      eventId: generateId(),
      eventType: "CartCheckedOut",
      data: { totalAmount: this.totalAmount },
      metadata: {},
      timestamp: new Date(),
    });
  }

  get totalAmount(): number {
    let total = 0;
    this._items.forEach(item => { total += item.price * item.quantity; });
    return total;
  }

  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case "CartCreated":
        this._customerId = event.data.customerId as string;
        break;
      case "ItemAddedToCart": {
        const { productId, name, price, quantity } = event.data as CartItem & { productId: string };
        const existing = this._items.get(productId);
        if (existing) {
          existing.quantity += quantity as number;
        } else {
          this._items.set(productId, { name: name as string, price: price as number, quantity: quantity as number });
        }
        break;
      }
      case "ItemRemovedFromCart":
        this._items.delete(event.data.productId as string);
        break;
      case "CartCheckedOut":
        this._checkedOut = true;
        break;
    }
  }

  protected deserialize(stored: StoredEvent): DomainEvent {
    return {
      eventId: stored.eventId,
      eventType: stored.eventType,
      data: stored.data,
      metadata: stored.metadata,
      timestamp: stored.timestamp,
    };
  }
}

Snapshots

As event streams grow long, replaying all events becomes slow. Snapshots capture the aggregate state at a point in time. When loading, you start from the latest snapshot and replay only subsequent events.

// Snapshot store
interface Snapshot {
  streamId: string;
  version: number;
  state: Record;
  createdAt: Date;
}

class SnapshotRepository {
  constructor(private readonly pool: Pool) {}

  async save(snapshot: Snapshot): Promise {
    await this.pool.query(
      "INSERT INTO snapshots (stream_id, version, state, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT (stream_id) DO UPDATE SET version = $2, state = $3, created_at = $4",
      [snapshot.streamId, snapshot.version, JSON.stringify(snapshot.state), snapshot.createdAt]
    );
  }

  async getLatest(streamId: string): Promise {
    const result = await this.pool.query(
      "SELECT * FROM snapshots WHERE stream_id = $1",
      [streamId]
    );
    return result.rows[0] || null;
  }
}

// Repository using snapshots
class EventSourcedCartRepository {
  constructor(
    private readonly eventStore: EventStore,
    private readonly snapshots: SnapshotRepository,
    private readonly snapshotInterval: number = 50
  ) {}

  async load(cartId: string): Promise {
    const cart = new ShoppingCart();
    const snapshot = await this.snapshots.getLatest(cartId);

    if (snapshot) {
      cart.restoreFromSnapshot(snapshot.state);
      const events = await this.eventStore.getStream(cartId, snapshot.version + 1);
      cart.loadFromHistory(events);
    } else {
      const events = await this.eventStore.getStream(cartId);
      cart.loadFromHistory(events);
    }

    return cart;
  }

  async save(cart: ShoppingCart): Promise {
    const events = cart.uncommittedEvents;
    await this.eventStore.append(cart.id, events, cart.version);

    // Create snapshot periodically
    if (cart.version % this.snapshotInterval === 0) {
      await this.snapshots.save({
        streamId: cart.id,
        version: cart.version + events.length,
        state: cart.toSnapshot(),
        createdAt: new Date(),
      });
    }

    cart.clearUncommittedEvents();
  }
}

Event Sourcing Challenges

  • Event schema evolution: Events are immutable — changing their structure requires upcasting strategies
  • Eventual consistency: Projections may lag behind the event store — UI must handle stale reads
  • Event store size: Streams can grow very large — use snapshots and archiving strategies
  • Learning curve: The mental model is fundamentally different from CRUD — team training is essential
  • Deleting data: GDPR compliance with immutable events requires crypto-shredding or tombstone events

Continue Learning