Lesson 3 of 8
5 min read
Advanced Node.js

Worker Threads

Run CPU-intensive tasks in parallel using worker threads

Why Worker Threads?

Node.js is single-threaded, which means CPU-intensive tasks block the event loop. Worker threads allow you to run JavaScript in parallel threads, perfect for heavy computations without blocking your main application.

⚡ When to Use Worker Threads

✅ Good Use Cases
  • • Image/video processing
  • • Heavy computations (crypto, ML)
  • • Parsing large files
  • • Data compression
❌ Not For
  • • I/O operations (already async)
  • • Database queries
  • • HTTP requests
  • • Simple tasks (overhead not worth it)

Basic Worker Thread

// main.js
const { Worker, isMainThread, workerData } = require('worker_threads');

if (isMainThread) {
  // Main thread
  console.log('Main thread starting worker...');
  
  const worker = new Worker('./worker.js', {
    workerData: { numbers: [1, 2, 3, 4, 5] }
  });

  worker.on('message', (result) => {
    console.log('Result from worker:', result);
  });

  worker.on('error', (err) => {
    console.error('Worker error:', err);
  });

  worker.on('exit', (code) => {
    console.log('Worker exited with code:', code);
  });
} else {
  // This code runs in the worker
  console.log('Worker starting...');
}

// worker.js
const { parentPort, workerData } = require('worker_threads');

// Heavy computation
const sum = workerData.numbers.reduce((a, b) => a + b, 0);

// Send result back to main thread
parentPort.postMessage(sum);

Inline Worker (No Separate File)

const { Worker, isMainThread, parentPort } = require('worker_threads');

function runInWorker(fn, data) {
  return new Promise((resolve, reject) => {
    const workerCode = `
      const { parentPort, workerData } = require('worker_threads');
      const fn = ${fn.toString()};
      const result = fn(workerData);
      parentPort.postMessage(result);
    `;

    const worker = new Worker(workerCode, {
      eval: true,
      workerData: data
    });

    worker.on('message', resolve);
    worker.on('error', reject);
  });
}

// Usage
async function main() {
  const heavyTask = (data) => {
    let sum = 0;
    for (let i = 0; i < data.iterations; i++) {
      sum += Math.sqrt(i);
    }
    return sum;
  };

  const result = await runInWorker(heavyTask, { iterations: 1e8 });
  console.log('Result:', result);
}

main();

Sharing Data with SharedArrayBuffer

For high-performance scenarios, use SharedArrayBuffer to share memory between threads without copying data.

// main.js
const { Worker } = require('worker_threads');

// Create shared memory
const sharedBuffer = new SharedArrayBuffer(4);  // 4 bytes
const sharedArray = new Int32Array(sharedBuffer);

sharedArray[0] = 0;  // Initial value

const worker = new Worker('./counter-worker.js', {
  workerData: { sharedBuffer }
});

// Main thread also increments
setInterval(() => {
  Atomics.add(sharedArray, 0, 1);
  console.log('Main sees:', sharedArray[0]);
}, 100);

// counter-worker.js
const { workerData } = require('worker_threads');
const sharedArray = new Int32Array(workerData.sharedBuffer);

setInterval(() => {
  Atomics.add(sharedArray, 0, 1);
  console.log('Worker sees:', sharedArray[0]);
}, 100);

Worker Pool Pattern

Create a pool of workers to handle multiple tasks efficiently without creating new workers each time.

const { Worker } = require('worker_threads');
const os = require('os');

class WorkerPool {
  constructor(workerScript, poolSize = os.cpus().length) {
    this.workerScript = workerScript;
    this.poolSize = poolSize;
    this.workers = [];
    this.freeWorkers = [];
    this.taskQueue = [];

    // Initialize workers
    for (let i = 0; i < poolSize; i++) {
      this.addWorker();
    }
  }

  addWorker() {
    const worker = new Worker(this.workerScript);
    
    worker.on('message', (result) => {
      worker.currentResolve(result);
      this.freeWorkers.push(worker);
      this.runNext();
    });

    worker.on('error', (err) => {
      worker.currentReject(err);
      // Replace crashed worker
      this.workers = this.workers.filter(w => w !== worker);
      this.addWorker();
    });

    this.workers.push(worker);
    this.freeWorkers.push(worker);
  }

  runTask(data) {
    return new Promise((resolve, reject) => {
      this.taskQueue.push({ data, resolve, reject });
      this.runNext();
    });
  }

  runNext() {
    if (this.taskQueue.length === 0 || this.freeWorkers.length === 0) {
      return;
    }

    const { data, resolve, reject } = this.taskQueue.shift();
    const worker = this.freeWorkers.pop();

    worker.currentResolve = resolve;
    worker.currentReject = reject;
    worker.postMessage(data);
  }

  async destroy() {
    for (const worker of this.workers) {
      await worker.terminate();
    }
  }
}

// Usage
const pool = new WorkerPool('./compute-worker.js', 4);

async function processMany() {
  const tasks = Array.from({ length: 100 }, (_, i) => 
    pool.runTask({ taskId: i, data: Math.random() * 1000 })
  );

  const results = await Promise.all(tasks);
  console.log('All done!', results.length);
  
  await pool.destroy();
}

processMany();

Message Channels

Use MessageChannel for direct communication between workers.

const { Worker, MessageChannel } = require('worker_threads');

// Create two workers that communicate directly
const worker1 = new Worker('./worker1.js');
const worker2 = new Worker('./worker2.js');

// Create a channel for direct communication
const { port1, port2 } = new MessageChannel();

// Send ports to workers
worker1.postMessage({ port: port1 }, [port1]);
worker2.postMessage({ port: port2 }, [port2]);

// Now worker1 and worker2 can communicate directly!

Practical Example: Image Processing

// image-processor.js (worker)
const { parentPort, workerData } = require('worker_threads');
const sharp = require('sharp');

async function processImage(imageBuffer, options) {
  const result = await sharp(imageBuffer)
    .resize(options.width, options.height)
    .grayscale(options.grayscale)
    .jpeg({ quality: options.quality })
    .toBuffer();
  
  return result;
}

parentPort.on('message', async ({ imageBuffer, options }) => {
  try {
    const result = await processImage(imageBuffer, options);
    parentPort.postMessage({ success: true, data: result });
  } catch (error) {
    parentPort.postMessage({ success: false, error: error.message });
  }
});

// main.js
const { Worker } = require('worker_threads');
const fs = require('fs').promises;

async function processImages(imagePaths) {
  const pool = new WorkerPool('./image-processor.js', 4);
  
  const tasks = imagePaths.map(async (path) => {
    const imageBuffer = await fs.readFile(path);
    return pool.runTask({
      imageBuffer,
      options: { width: 800, height: 600, grayscale: true, quality: 80 }
    });
  });

  return Promise.all(tasks);
}

📦 Libraries for Worker Threads

  • Piscina - Fast worker thread pool by Node.js team
  • workerpool - Easy-to-use worker pool
  • threads.js - Unified API for Workers and Web Workers

💡 Best Practices

  • • Use worker pools for repeated tasks to avoid creation overhead
  • • Transfer large buffers instead of copying: postMessage(data, [buffer])
  • • Use SharedArrayBuffer with Atomics for shared state
  • • Match pool size to CPU cores: os.cpus().length
  • • Handle worker errors gracefully and restart failed workers

Continue Learning