Intermediate
20 min
Full Guide
Real-Time Design Patterns
Learn architectural patterns for building scalable real-time applications
Scaling Real-Time Applications
Single-server WebSocket applications can't scale horizontally because connections are stateful. When you have multiple servers, clients connected to different servers can't communicate directly. We need a pub/sub backbone (like Redis) to relay messages between servers.
Architecture Diagram
┌─────────────────────────────────────────────────────────────────┐
│ Load Balancer │
│ (Sticky Sessions / IP Hash) │
└───────────────────────────┬─────────────────────────────────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Server 1 │ │ Server 2 │ │ Server 3 │
│ WS + HTTP │ │ WS + HTTP │ │ WS + HTTP │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ ┌──────▼──────┐ │
└─────────► Redis Pub ◄─────────┘
│ /Sub │
└──────┬──────┘
│
┌──────▼──────┐
│ Database │
│ (Postgres) │
└─────────────┘
Redis Pub/Sub with Socket.io
// Scale Socket.io across multiple servers with Redis
import { createServer } from 'http';
import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
const httpServer = createServer();
const io = new Server(httpServer, {
cors: { origin: '*' }
});
// Create Redis clients
const pubClient = createClient({ url: 'redis://localhost:6379' });
const subClient = pubClient.duplicate();
// Connect and set up adapter
Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
io.adapter(createAdapter(pubClient, subClient));
console.log('Redis adapter connected');
});
// Now messages are broadcast across all servers!
io.on('connection', (socket) => {
socket.on('chat-message', (data) => {
// This reaches clients on ALL servers
io.to(data.room).emit('message', data);
});
});
// With Redis Streams for persistence
import { createAdapter as createStreamsAdapter } from '@socket.io/redis-streams-adapter';
// Redis Streams persist messages - great for missed message recovery
io.adapter(createStreamsAdapter(pubClient));
// Messages can be replayed when client reconnects
socket.on('connection', (socket) => {
// Recover missed messages since disconnect
const lastEventId = socket.handshake.auth.lastEventId;
if (lastEventId) {
recoverMessages(socket, lastEventId);
}
});
Presence System
// Track who's online and their status
import Redis from 'ioredis';
const redis = new Redis();
class PresenceSystem {
private readonly PRESENCE_KEY = 'presence:online';
private readonly HEARTBEAT_INTERVAL = 30000; // 30 seconds
private readonly EXPIRE_TIME = 60; // seconds
// User comes online
async setOnline(userId, socketId, metadata = {}) {
const data = JSON.stringify({
socketId,
status: 'online',
lastSeen: Date.now(),
...metadata
});
// Use sorted set with timestamp as score for easy cleanup
await redis.zadd(this.PRESENCE_KEY, Date.now(), userId);
// Store detailed presence data
await redis.setex(`presence:${userId}`, this.EXPIRE_TIME, data);
// Publish presence change
await redis.publish('presence:changes', JSON.stringify({
userId,
status: 'online',
timestamp: Date.now()
}));
}
// Heartbeat to keep alive
async heartbeat(userId) {
await redis.zadd(this.PRESENCE_KEY, Date.now(), userId);
await redis.expire(`presence:${userId}`, this.EXPIRE_TIME);
}
// User goes offline
async setOffline(userId) {
await redis.zrem(this.PRESENCE_KEY, userId);
await redis.del(`presence:${userId}`);
await redis.publish('presence:changes', JSON.stringify({
userId,
status: 'offline',
timestamp: Date.now()
}));
}
// Get all online users
async getOnlineUsers() {
const cutoff = Date.now() - (this.EXPIRE_TIME * 1000);
return redis.zrangebyscore(this.PRESENCE_KEY, cutoff, '+inf');
}
// Get specific user's presence
async getPresence(userId) {
const data = await redis.get(`presence:${userId}`);
return data ? JSON.parse(data) : { status: 'offline' };
}
// Clean up stale presence entries
async cleanupStale() {
const cutoff = Date.now() - (this.EXPIRE_TIME * 1000);
const staleUsers = await redis.zrangebyscore(this.PRESENCE_KEY, 0, cutoff);
for (const userId of staleUsers) {
await this.setOffline(userId);
}
return staleUsers.length;
}
}
// Integration with Socket.io
const presence = new PresenceSystem();
io.on('connection', async (socket) => {
const userId = socket.handshake.auth.userId;
// Set online
await presence.setOnline(userId, socket.id, {
device: socket.handshake.query.device
});
// Notify others
socket.broadcast.emit('user-online', userId);
// Heartbeat
const heartbeatInterval = setInterval(() => {
presence.heartbeat(userId);
}, 30000);
// Handle disconnect
socket.on('disconnect', async () => {
clearInterval(heartbeatInterval);
await presence.setOffline(userId);
socket.broadcast.emit('user-offline', userId);
});
});
// Subscribe to presence changes (for multi-server)
const sub = redis.duplicate();
await sub.subscribe('presence:changes');
sub.on('message', (channel, message) => {
const { userId, status } = JSON.parse(message);
io.emit(`user-${status}`, userId);
});
Optimistic Updates
// Update UI immediately, reconcile with server later
class OptimisticStore {
constructor() {
this.items = [];
this.pendingUpdates = new Map(); // id -> original state
this.socket = io();
}
// Add item optimistically
async addItem(item) {
const tempId = 'temp_' + Date.now();
const optimisticItem = { ...item, id: tempId, pending: true };
// Add immediately to UI
this.items.push(optimisticItem);
this.render();
try {
// Send to server
const response = await this.sendToServer('add', item);
// Replace temp item with real one
const index = this.items.findIndex(i => i.id === tempId);
this.items[index] = { ...response, pending: false };
this.render();
return response;
} catch (error) {
// Rollback on failure
this.items = this.items.filter(i => i.id !== tempId);
this.render();
throw error;
}
}
// Update item optimistically
async updateItem(id, updates) {
const item = this.items.find(i => i.id === id);
if (!item) return;
// Store original for rollback
const original = { ...item };
this.pendingUpdates.set(id, original);
// Apply update immediately
Object.assign(item, updates, { pending: true });
this.render();
try {
const response = await this.sendToServer('update', { id, ...updates });
Object.assign(item, response, { pending: false });
this.pendingUpdates.delete(id);
this.render();
return response;
} catch (error) {
// Rollback to original
Object.assign(item, original, { pending: false });
this.pendingUpdates.delete(id);
this.render();
throw error;
}
}
sendToServer(action, data) {
return new Promise((resolve, reject) => {
this.socket.emit(action, data, (response) => {
if (response.error) reject(new Error(response.error));
else resolve(response.data);
});
});
}
}
// React hook for optimistic updates
function useOptimistic(initialValue, submitFn) {
const [value, setValue] = useState(initialValue);
const [optimisticValue, setOptimisticValue] = useState(null);
const [isPending, setIsPending] = useState(false);
async function submit(newValue) {
setOptimisticValue(newValue);
setIsPending(true);
try {
const result = await submitFn(newValue);
setValue(result);
setOptimisticValue(null);
} catch (error) {
setOptimisticValue(null);
throw error;
} finally {
setIsPending(false);
}
}
return [optimisticValue ?? value, submit, isPending];
}
Conflict Resolution - Last Write Wins
// Simplest strategy: timestamp-based conflict resolution
class LastWriteWins {
async update(key, value, timestamp) {
const current = await redis.hgetall(`data:${key}`);
if (!current.timestamp || timestamp > parseInt(current.timestamp)) {
await redis.hmset(`data:${key}`, {
value: JSON.stringify(value),
timestamp: timestamp.toString()
});
return { accepted: true, value };
}
return { accepted: false, value: JSON.parse(current.value) };
}
}
// Version-based conflict detection
class OptimisticLocking {
async update(key, value, expectedVersion) {
// Use Redis transaction for atomicity
const result = await redis.watch(`data:${key}`);
const current = await redis.hgetall(`data:${key}`);
const currentVersion = parseInt(current.version || '0');
if (currentVersion !== expectedVersion) {
await redis.unwatch();
return {
success: false,
error: 'CONFLICT',
currentValue: JSON.parse(current.value),
currentVersion
};
}
await redis
.multi()
.hmset(`data:${key}`, {
value: JSON.stringify(value),
version: (currentVersion + 1).toString()
})
.exec();
return { success: true, newVersion: currentVersion + 1 };
}
}
// Client-side conflict handling
socket.on('update-response', (response) => {
if (response.error === 'CONFLICT') {
// Show conflict resolution UI
showConflictResolver({
yourChange: pendingChange,
serverValue: response.currentValue,
onResolve: (resolved) => {
socket.emit('update', {
...resolved,
version: response.currentVersion
});
},
onDiscard: () => {
applyServerValue(response.currentValue);
}
});
}
});
Operational Transformation (OT)
// For collaborative text editing (like Google Docs)
// Basic OT operations for text
const operations = {
insert: (pos, text) => ({ type: 'insert', pos, text }),
delete: (pos, length) => ({ type: 'delete', pos, length })
};
// Transform operation against another operation
function transform(op1, op2) {
// op1 was applied first, transform op2 for concurrent apply
if (op1.type === 'insert' && op2.type === 'insert') {
if (op2.pos <= op1.pos) {
return op2; // op2 comes before, no change
} else {
// Shift op2 position by inserted text length
return { ...op2, pos: op2.pos + op1.text.length };
}
}
if (op1.type === 'insert' && op2.type === 'delete') {
if (op2.pos >= op1.pos) {
return { ...op2, pos: op2.pos + op1.text.length };
}
return op2;
}
if (op1.type === 'delete' && op2.type === 'insert') {
if (op2.pos <= op1.pos) {
return op2;
} else if (op2.pos >= op1.pos + op1.length) {
return { ...op2, pos: op2.pos - op1.length };
} else {
// Insert is within deleted range
return { ...op2, pos: op1.pos };
}
}
if (op1.type === 'delete' && op2.type === 'delete') {
// Complex case - handle overlapping deletes
// Simplified version:
if (op2.pos >= op1.pos + op1.length) {
return { ...op2, pos: op2.pos - op1.length };
}
return op2;
}
}
// OT Client
class OTClient {
constructor(socket, initialDoc) {
this.socket = socket;
this.doc = initialDoc;
this.revision = 0;
this.pending = [];
this.buffer = [];
socket.on('operation', (op, revision) => {
this.applyServer(op, revision);
});
}
applyLocal(op) {
// Apply to local doc
this.doc = applyOp(this.doc, op);
// Buffer if we have pending operations
if (this.pending.length > 0) {
this.buffer.push(op);
} else {
this.send(op);
}
}
send(op) {
this.pending.push(op);
this.socket.emit('operation', op, this.revision);
}
applyServer(op, revision) {
// Transform against pending operations
let transformedOp = op;
for (const pending of this.pending) {
transformedOp = transform(pending, transformedOp);
}
for (const buffered of this.buffer) {
transformedOp = transform(buffered, transformedOp);
}
// Apply transformed op
this.doc = applyOp(this.doc, transformedOp);
this.revision = revision;
// Acknowledged
this.pending.shift();
if (this.buffer.length > 0) {
const nextOp = this.buffer.shift();
this.send(nextOp);
}
}
}
Rate Limiting and Backpressure
// Protect your real-time server from abuse
import rateLimit from 'express-rate-limit';
import { RateLimiterRedis } from 'rate-limiter-flexible';
// Socket.io rate limiting
const rateLimiter = new RateLimiterRedis({
storeClient: redisClient,
keyPrefix: 'ws-limit',
points: 100, // Number of events
duration: 60, // Per minute
});
io.use(async (socket, next) => {
try {
await rateLimiter.consume(socket.handshake.address);
next();
} catch (error) {
next(new Error('Too many connections'));
}
});
// Per-event rate limiting
io.on('connection', (socket) => {
const eventLimiter = new RateLimiterRedis({
storeClient: redisClient,
keyPrefix: `ws-events:${socket.id}`,
points: 20,
duration: 1, // 20 messages per second
});
socket.use(async ([event, ...args], next) => {
try {
await eventLimiter.consume(event);
next();
} catch (error) {
socket.emit('error', 'Rate limit exceeded');
}
});
});
// Backpressure - slow down when buffer fills
class BackpressuredSocket {
constructor(socket) {
this.socket = socket;
this.queue = [];
this.sending = false;
this.highWaterMark = 1000;
}
send(data) {
if (this.queue.length >= this.highWaterMark) {
console.warn('Backpressure: dropping message');
return false;
}
this.queue.push(data);
this.drain();
return true;
}
async drain() {
if (this.sending) return;
this.sending = true;
while (this.queue.length > 0) {
const data = this.queue.shift();
// Check if socket is writable
if (this.socket.bufferedAmount > 65536) {
// Wait for buffer to drain
await new Promise(resolve => {
const check = () => {
if (this.socket.bufferedAmount < 32768) {
resolve();
} else {
setTimeout(check, 10);
}
};
check();
});
}
this.socket.send(data);
}
this.sending = false;
}
}
🏗️ Architecture Checklist
- ✓ Use Redis adapter for multi-server Socket.io
- ✓ Implement heartbeats to detect dead connections
- ✓ Design for reconnection - clients will disconnect
- ✓ Use sticky sessions with load balancer for WebSockets
- ✓ Rate limit connections and events to prevent abuse
- ✓ Handle backpressure when clients can't keep up
- ✓ Choose conflict strategy appropriate for your use case
- ✓ Monitor connection counts and memory usage