Intermediate
20 min
Full Guide

Real-Time Design Patterns

Learn architectural patterns for building scalable real-time applications

Scaling Real-Time Applications

Single-server WebSocket applications can't scale horizontally because connections are stateful. When you have multiple servers, clients connected to different servers can't communicate directly. We need a pub/sub backbone (like Redis) to relay messages between servers.

Architecture Diagram

┌─────────────────────────────────────────────────────────────────┐
│                        Load Balancer                            │
│                   (Sticky Sessions / IP Hash)                   │
└───────────────────────────┬─────────────────────────────────────┘
                            │
           ┌────────────────┼────────────────┐
           │                │                │
    ┌──────▼──────┐  ┌──────▼──────┐  ┌──────▼──────┐
    │  Server 1   │  │  Server 2   │  │  Server 3   │
    │  WS + HTTP  │  │  WS + HTTP  │  │  WS + HTTP  │
    └──────┬──────┘  └──────┬──────┘  └──────┬──────┘
           │                │                │
           │         ┌──────▼──────┐         │
           └─────────►  Redis Pub  ◄─────────┘
                     │   /Sub      │
                     └──────┬──────┘
                            │
                     ┌──────▼──────┐
                     │  Database   │
                     │  (Postgres) │
                     └─────────────┘

Redis Pub/Sub with Socket.io

// Scale Socket.io across multiple servers with Redis

import { createServer } from 'http';
import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

const httpServer = createServer();
const io = new Server(httpServer, {
  cors: { origin: '*' }
});

// Create Redis clients
const pubClient = createClient({ url: 'redis://localhost:6379' });
const subClient = pubClient.duplicate();

// Connect and set up adapter
Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
  io.adapter(createAdapter(pubClient, subClient));
  console.log('Redis adapter connected');
});

// Now messages are broadcast across all servers!
io.on('connection', (socket) => {
  socket.on('chat-message', (data) => {
    // This reaches clients on ALL servers
    io.to(data.room).emit('message', data);
  });
});

// With Redis Streams for persistence
import { createAdapter as createStreamsAdapter } from '@socket.io/redis-streams-adapter';

// Redis Streams persist messages - great for missed message recovery
io.adapter(createStreamsAdapter(pubClient));

// Messages can be replayed when client reconnects
socket.on('connection', (socket) => {
  // Recover missed messages since disconnect
  const lastEventId = socket.handshake.auth.lastEventId;
  if (lastEventId) {
    recoverMessages(socket, lastEventId);
  }
});

Presence System

// Track who's online and their status

import Redis from 'ioredis';
const redis = new Redis();

class PresenceSystem {
  private readonly PRESENCE_KEY = 'presence:online';
  private readonly HEARTBEAT_INTERVAL = 30000; // 30 seconds
  private readonly EXPIRE_TIME = 60; // seconds
  
  // User comes online
  async setOnline(userId, socketId, metadata = {}) {
    const data = JSON.stringify({
      socketId,
      status: 'online',
      lastSeen: Date.now(),
      ...metadata
    });
    
    // Use sorted set with timestamp as score for easy cleanup
    await redis.zadd(this.PRESENCE_KEY, Date.now(), userId);
    
    // Store detailed presence data
    await redis.setex(`presence:${userId}`, this.EXPIRE_TIME, data);
    
    // Publish presence change
    await redis.publish('presence:changes', JSON.stringify({
      userId,
      status: 'online',
      timestamp: Date.now()
    }));
  }
  
  // Heartbeat to keep alive
  async heartbeat(userId) {
    await redis.zadd(this.PRESENCE_KEY, Date.now(), userId);
    await redis.expire(`presence:${userId}`, this.EXPIRE_TIME);
  }
  
  // User goes offline
  async setOffline(userId) {
    await redis.zrem(this.PRESENCE_KEY, userId);
    await redis.del(`presence:${userId}`);
    
    await redis.publish('presence:changes', JSON.stringify({
      userId,
      status: 'offline',
      timestamp: Date.now()
    }));
  }
  
  // Get all online users
  async getOnlineUsers() {
    const cutoff = Date.now() - (this.EXPIRE_TIME * 1000);
    return redis.zrangebyscore(this.PRESENCE_KEY, cutoff, '+inf');
  }
  
  // Get specific user's presence
  async getPresence(userId) {
    const data = await redis.get(`presence:${userId}`);
    return data ? JSON.parse(data) : { status: 'offline' };
  }
  
  // Clean up stale presence entries
  async cleanupStale() {
    const cutoff = Date.now() - (this.EXPIRE_TIME * 1000);
    const staleUsers = await redis.zrangebyscore(this.PRESENCE_KEY, 0, cutoff);
    
    for (const userId of staleUsers) {
      await this.setOffline(userId);
    }
    
    return staleUsers.length;
  }
}

// Integration with Socket.io
const presence = new PresenceSystem();

io.on('connection', async (socket) => {
  const userId = socket.handshake.auth.userId;
  
  // Set online
  await presence.setOnline(userId, socket.id, {
    device: socket.handshake.query.device
  });
  
  // Notify others
  socket.broadcast.emit('user-online', userId);
  
  // Heartbeat
  const heartbeatInterval = setInterval(() => {
    presence.heartbeat(userId);
  }, 30000);
  
  // Handle disconnect
  socket.on('disconnect', async () => {
    clearInterval(heartbeatInterval);
    await presence.setOffline(userId);
    socket.broadcast.emit('user-offline', userId);
  });
});

// Subscribe to presence changes (for multi-server)
const sub = redis.duplicate();
await sub.subscribe('presence:changes');
sub.on('message', (channel, message) => {
  const { userId, status } = JSON.parse(message);
  io.emit(`user-${status}`, userId);
});

Optimistic Updates

// Update UI immediately, reconcile with server later

class OptimisticStore {
  constructor() {
    this.items = [];
    this.pendingUpdates = new Map(); // id -> original state
    this.socket = io();
  }
  
  // Add item optimistically
  async addItem(item) {
    const tempId = 'temp_' + Date.now();
    const optimisticItem = { ...item, id: tempId, pending: true };
    
    // Add immediately to UI
    this.items.push(optimisticItem);
    this.render();
    
    try {
      // Send to server
      const response = await this.sendToServer('add', item);
      
      // Replace temp item with real one
      const index = this.items.findIndex(i => i.id === tempId);
      this.items[index] = { ...response, pending: false };
      this.render();
      
      return response;
    } catch (error) {
      // Rollback on failure
      this.items = this.items.filter(i => i.id !== tempId);
      this.render();
      throw error;
    }
  }
  
  // Update item optimistically
  async updateItem(id, updates) {
    const item = this.items.find(i => i.id === id);
    if (!item) return;
    
    // Store original for rollback
    const original = { ...item };
    this.pendingUpdates.set(id, original);
    
    // Apply update immediately
    Object.assign(item, updates, { pending: true });
    this.render();
    
    try {
      const response = await this.sendToServer('update', { id, ...updates });
      Object.assign(item, response, { pending: false });
      this.pendingUpdates.delete(id);
      this.render();
      return response;
    } catch (error) {
      // Rollback to original
      Object.assign(item, original, { pending: false });
      this.pendingUpdates.delete(id);
      this.render();
      throw error;
    }
  }
  
  sendToServer(action, data) {
    return new Promise((resolve, reject) => {
      this.socket.emit(action, data, (response) => {
        if (response.error) reject(new Error(response.error));
        else resolve(response.data);
      });
    });
  }
}

// React hook for optimistic updates
function useOptimistic(initialValue, submitFn) {
  const [value, setValue] = useState(initialValue);
  const [optimisticValue, setOptimisticValue] = useState(null);
  const [isPending, setIsPending] = useState(false);
  
  async function submit(newValue) {
    setOptimisticValue(newValue);
    setIsPending(true);
    
    try {
      const result = await submitFn(newValue);
      setValue(result);
      setOptimisticValue(null);
    } catch (error) {
      setOptimisticValue(null);
      throw error;
    } finally {
      setIsPending(false);
    }
  }
  
  return [optimisticValue ?? value, submit, isPending];
}

Conflict Resolution - Last Write Wins

// Simplest strategy: timestamp-based conflict resolution

class LastWriteWins {
  async update(key, value, timestamp) {
    const current = await redis.hgetall(`data:${key}`);
    
    if (!current.timestamp || timestamp > parseInt(current.timestamp)) {
      await redis.hmset(`data:${key}`, {
        value: JSON.stringify(value),
        timestamp: timestamp.toString()
      });
      return { accepted: true, value };
    }
    
    return { accepted: false, value: JSON.parse(current.value) };
  }
}

// Version-based conflict detection
class OptimisticLocking {
  async update(key, value, expectedVersion) {
    // Use Redis transaction for atomicity
    const result = await redis.watch(`data:${key}`);
    
    const current = await redis.hgetall(`data:${key}`);
    const currentVersion = parseInt(current.version || '0');
    
    if (currentVersion !== expectedVersion) {
      await redis.unwatch();
      return {
        success: false,
        error: 'CONFLICT',
        currentValue: JSON.parse(current.value),
        currentVersion
      };
    }
    
    await redis
      .multi()
      .hmset(`data:${key}`, {
        value: JSON.stringify(value),
        version: (currentVersion + 1).toString()
      })
      .exec();
    
    return { success: true, newVersion: currentVersion + 1 };
  }
}

// Client-side conflict handling
socket.on('update-response', (response) => {
  if (response.error === 'CONFLICT') {
    // Show conflict resolution UI
    showConflictResolver({
      yourChange: pendingChange,
      serverValue: response.currentValue,
      onResolve: (resolved) => {
        socket.emit('update', {
          ...resolved,
          version: response.currentVersion
        });
      },
      onDiscard: () => {
        applyServerValue(response.currentValue);
      }
    });
  }
});

Operational Transformation (OT)

// For collaborative text editing (like Google Docs)

// Basic OT operations for text
const operations = {
  insert: (pos, text) => ({ type: 'insert', pos, text }),
  delete: (pos, length) => ({ type: 'delete', pos, length })
};

// Transform operation against another operation
function transform(op1, op2) {
  // op1 was applied first, transform op2 for concurrent apply
  
  if (op1.type === 'insert' && op2.type === 'insert') {
    if (op2.pos <= op1.pos) {
      return op2; // op2 comes before, no change
    } else {
      // Shift op2 position by inserted text length
      return { ...op2, pos: op2.pos + op1.text.length };
    }
  }
  
  if (op1.type === 'insert' && op2.type === 'delete') {
    if (op2.pos >= op1.pos) {
      return { ...op2, pos: op2.pos + op1.text.length };
    }
    return op2;
  }
  
  if (op1.type === 'delete' && op2.type === 'insert') {
    if (op2.pos <= op1.pos) {
      return op2;
    } else if (op2.pos >= op1.pos + op1.length) {
      return { ...op2, pos: op2.pos - op1.length };
    } else {
      // Insert is within deleted range
      return { ...op2, pos: op1.pos };
    }
  }
  
  if (op1.type === 'delete' && op2.type === 'delete') {
    // Complex case - handle overlapping deletes
    // Simplified version:
    if (op2.pos >= op1.pos + op1.length) {
      return { ...op2, pos: op2.pos - op1.length };
    }
    return op2;
  }
}

// OT Client
class OTClient {
  constructor(socket, initialDoc) {
    this.socket = socket;
    this.doc = initialDoc;
    this.revision = 0;
    this.pending = [];
    this.buffer = [];
    
    socket.on('operation', (op, revision) => {
      this.applyServer(op, revision);
    });
  }
  
  applyLocal(op) {
    // Apply to local doc
    this.doc = applyOp(this.doc, op);
    
    // Buffer if we have pending operations
    if (this.pending.length > 0) {
      this.buffer.push(op);
    } else {
      this.send(op);
    }
  }
  
  send(op) {
    this.pending.push(op);
    this.socket.emit('operation', op, this.revision);
  }
  
  applyServer(op, revision) {
    // Transform against pending operations
    let transformedOp = op;
    for (const pending of this.pending) {
      transformedOp = transform(pending, transformedOp);
    }
    for (const buffered of this.buffer) {
      transformedOp = transform(buffered, transformedOp);
    }
    
    // Apply transformed op
    this.doc = applyOp(this.doc, transformedOp);
    this.revision = revision;
    
    // Acknowledged
    this.pending.shift();
    if (this.buffer.length > 0) {
      const nextOp = this.buffer.shift();
      this.send(nextOp);
    }
  }
}

Rate Limiting and Backpressure

// Protect your real-time server from abuse

import rateLimit from 'express-rate-limit';
import { RateLimiterRedis } from 'rate-limiter-flexible';

// Socket.io rate limiting
const rateLimiter = new RateLimiterRedis({
  storeClient: redisClient,
  keyPrefix: 'ws-limit',
  points: 100,  // Number of events
  duration: 60, // Per minute
});

io.use(async (socket, next) => {
  try {
    await rateLimiter.consume(socket.handshake.address);
    next();
  } catch (error) {
    next(new Error('Too many connections'));
  }
});

// Per-event rate limiting
io.on('connection', (socket) => {
  const eventLimiter = new RateLimiterRedis({
    storeClient: redisClient,
    keyPrefix: `ws-events:${socket.id}`,
    points: 20,
    duration: 1, // 20 messages per second
  });
  
  socket.use(async ([event, ...args], next) => {
    try {
      await eventLimiter.consume(event);
      next();
    } catch (error) {
      socket.emit('error', 'Rate limit exceeded');
    }
  });
});

// Backpressure - slow down when buffer fills
class BackpressuredSocket {
  constructor(socket) {
    this.socket = socket;
    this.queue = [];
    this.sending = false;
    this.highWaterMark = 1000;
  }
  
  send(data) {
    if (this.queue.length >= this.highWaterMark) {
      console.warn('Backpressure: dropping message');
      return false;
    }
    
    this.queue.push(data);
    this.drain();
    return true;
  }
  
  async drain() {
    if (this.sending) return;
    this.sending = true;
    
    while (this.queue.length > 0) {
      const data = this.queue.shift();
      
      // Check if socket is writable
      if (this.socket.bufferedAmount > 65536) {
        // Wait for buffer to drain
        await new Promise(resolve => {
          const check = () => {
            if (this.socket.bufferedAmount < 32768) {
              resolve();
            } else {
              setTimeout(check, 10);
            }
          };
          check();
        });
      }
      
      this.socket.send(data);
    }
    
    this.sending = false;
  }
}

🏗️ Architecture Checklist

  • Use Redis adapter for multi-server Socket.io
  • Implement heartbeats to detect dead connections
  • Design for reconnection - clients will disconnect
  • Use sticky sessions with load balancer for WebSockets
  • Rate limit connections and events to prevent abuse
  • Handle backpressure when clients can't keep up
  • Choose conflict strategy appropriate for your use case
  • Monitor connection counts and memory usage