TechLead
Lesson 10 of 16
5 min read
Node.js

Worker Threads & Clustering

Scale Node.js apps with worker threads for CPU-intensive tasks, cluster module for multi-core servers, and child processes for isolation

Breaking Out of Single-Threaded

Node.js is single-threaded by default, which is great for I/O-bound tasks but a bottleneck for CPU-intensive work. Worker threads, clustering, and child processes each solve this differently.

🧵 When to Use Each

Worker Threads

CPU-heavy tasks: image processing, encryption, parsing large data

Cluster

Scale HTTP servers across all CPU cores

Child Process

Run external commands, full process isolation

Worker Threads for CPU-Intensive Tasks

// main.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

if (isMainThread) {
  // Main thread: create a worker
  function runWorker(data) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, { workerData: data });
      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0) reject(new Error(`Worker exited with code ${code}`));
      });
    });
  }

  async function main() {
    console.time('parallel');
    // Run 4 heavy computations in parallel
    const results = await Promise.all([
      runWorker({ start: 0, end: 25_000_000 }),
      runWorker({ start: 25_000_000, end: 50_000_000 }),
      runWorker({ start: 50_000_000, end: 75_000_000 }),
      runWorker({ start: 75_000_000, end: 100_000_000 }),
    ]);
    const total = results.reduce((sum, r) => sum + r, 0);
    console.timeEnd('parallel');
    console.log('Total:', total);
  }

  main();
} else {
  // Worker thread: do heavy computation
  const { start, end } = workerData;
  let sum = 0;
  for (let i = start; i < end; i++) {
    sum += Math.sqrt(i);
  }
  parentPort.postMessage(sum);
}

Worker Thread Pool

const { Worker } = require('worker_threads');
const os = require('os');

class WorkerPool {
  constructor(workerFile, poolSize = os.cpus().length) {
    this.workerFile = workerFile;
    this.pool = [];
    this.queue = [];

    for (let i = 0; i < poolSize; i++) {
      this.addWorker();
    }
  }

  addWorker() {
    const worker = new Worker(this.workerFile);
    worker.busy = false;
    worker.on('message', (result) => {
      worker.busy = false;
      worker.currentResolve(result);
      this.processQueue();
    });
    worker.on('error', (err) => {
      worker.busy = false;
      worker.currentReject(err);
      this.processQueue();
    });
    this.pool.push(worker);
  }

  processQueue() {
    if (this.queue.length === 0) return;
    const freeWorker = this.pool.find(w => !w.busy);
    if (!freeWorker) return;

    const { data, resolve, reject } = this.queue.shift();
    freeWorker.busy = true;
    freeWorker.currentResolve = resolve;
    freeWorker.currentReject = reject;
    freeWorker.postMessage(data);
  }

  exec(data) {
    return new Promise((resolve, reject) => {
      this.queue.push({ data, resolve, reject });
      this.processQueue();
    });
  }

  destroy() {
    this.pool.forEach(w => w.terminate());
  }
}

// Usage
const pool = new WorkerPool('./image-worker.js', 4);

// Process 100 images using 4 workers
const results = await Promise.all(
  images.map(img => pool.exec({ path: img, resize: [800, 600] }))
);
pool.destroy();

SharedArrayBuffer for Shared Memory

const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  // Create shared memory (1024 integers)
  const sharedBuffer = new SharedArrayBuffer(1024 * 4);
  const sharedArray = new Int32Array(sharedBuffer);

  const workers = [];
  for (let i = 0; i < 4; i++) {
    const worker = new Worker(__filename, {
      workerData: { buffer: sharedBuffer, workerId: i }
    });
    workers.push(new Promise(r => worker.on('exit', r)));
  }

  await Promise.all(workers);

  // All workers have written to shared memory
  console.log('Results:', Array.from(sharedArray.slice(0, 20)));

  // Use Atomics for thread-safe operations
  Atomics.add(sharedArray, 0, 1);      // Atomic increment
  Atomics.load(sharedArray, 0);        // Atomic read
  Atomics.store(sharedArray, 0, 42);   // Atomic write
  Atomics.compareExchange(sharedArray, 0, 42, 100); // CAS
}

Cluster Module for Multi-Core Servers

const cluster = require('cluster');
const http = require('http');
const os = require('os');

if (cluster.isPrimary) {
  const numCPUs = os.cpus().length;
  console.log(`Primary ${process.pid} starting ${numCPUs} workers`);

  // Fork workers
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // Handle worker crashes
  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died (${signal || code})`);
    console.log('Starting a new worker...');
    cluster.fork(); // Auto-restart
  });

  // Zero-downtime restart
  process.on('SIGUSR2', () => {
    const workers = Object.values(cluster.workers);
    function restartWorker(index) {
      if (index >= workers.length) return;
      const worker = workers[index];
      console.log(`Restarting worker ${worker.process.pid}`);

      worker.disconnect();
      worker.on('exit', () => {
        if (!worker.exitedAfterDisconnect) return;
        const newWorker = cluster.fork();
        newWorker.on('listening', () => restartWorker(index + 1));
      });
    }
    restartWorker(0);
  });
} else {
  // Workers share the same port
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`Handled by worker ${process.pid}\n`);
  }).listen(3000);

  console.log(`Worker ${process.pid} started`);
}

Child Processes

const { exec, execFile, spawn, fork } = require('child_process');
const { promisify } = require('util');
const execAsync = promisify(exec);

// exec: run shell command (buffered output)
const { stdout } = await execAsync('ls -la');
console.log(stdout);

// spawn: stream output (for long-running processes)
const child = spawn('ffmpeg', [
  '-i', 'input.mp4',
  '-codec', 'libx264',
  'output.mp4'
]);

child.stdout.on('data', (data) => console.log('stdout:', data.toString()));
child.stderr.on('data', (data) => console.log('progress:', data.toString()));
child.on('close', (code) => console.log('Exit code:', code));

// fork: spawn a Node.js process with IPC
// parent.js
const worker = fork('./worker.js');

worker.send({ task: 'processData', payload: largeData });

worker.on('message', (result) => {
  console.log('Worker result:', result);
});

// worker.js
process.on('message', async (msg) => {
  const result = await processData(msg.payload);
  process.send(result);
});

💡 Key Takeaways

  • • Worker threads share memory and are ideal for CPU-bound tasks
  • • Cluster module scales HTTP servers across CPU cores
  • • Use a worker pool to limit concurrent threads
  • • Use SharedArrayBuffer with Atomics for thread-safe shared state
  • • Child processes provide full isolation for external commands

Continue Learning