TechLead
Lesson 9 of 16
5 min read
Node.js

Node.js Streams Deep Dive

Master Readable, Writable, Duplex, and Transform streams with backpressure handling, stream.pipeline(), and real-world streaming patterns

Understanding Streams

Streams are the backbone of Node.js I/O. They enable you to process data piece by piece instead of loading everything into memory. Every HTTP request, file operation, and network socket in Node.js is a stream under the hood.

🌊 Stream Types

Readable

Data source: fs.createReadStream, http request, process.stdin

Writable

Data sink: fs.createWriteStream, http response, process.stdout

Duplex

Both read & write: TCP socket, WebSocket

Transform

Modify data in transit: zlib, crypto cipher, CSV parser

Creating Custom Readable Streams

const { Readable } = require('stream');

// Generate data on demand
class CounterStream extends Readable {
  constructor(max, options) {
    super(options);
    this.max = max;
    this.current = 0;
  }

  _read() {
    if (this.current <= this.max) {
      const data = `Number: ${this.current}\n`;
      this.push(data);
      this.current++;
    } else {
      this.push(null); // Signal end of stream
    }
  }
}

const counter = new CounterStream(1000);
counter.pipe(process.stdout);

// Simpler: Readable.from() for iterables
async function* generateData() {
  for (let i = 0; i < 100; i++) {
    yield JSON.stringify({ id: i, timestamp: Date.now() }) + '\n';
    await new Promise(r => setTimeout(r, 10));
  }
}

const stream = Readable.from(generateData());
stream.pipe(process.stdout);

Creating Custom Transform Streams

const { Transform, pipeline } = require('stream');
const fs = require('fs');

// Transform: Convert CSV to JSON
class CSVtoJSON extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.headers = null;
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Keep incomplete line

    for (const line of lines) {
      if (!line.trim()) continue;
      const values = line.split(',').map(v => v.trim());

      if (!this.headers) {
        this.headers = values;
        continue;
      }

      const obj = {};
      this.headers.forEach((header, i) => {
        obj[header] = values[i];
      });
      this.push(JSON.stringify(obj) + '\n');
    }
    callback();
  }

  _flush(callback) {
    // Handle any remaining data
    if (this.buffer.trim() && this.headers) {
      const values = this.buffer.split(',').map(v => v.trim());
      const obj = {};
      this.headers.forEach((header, i) => {
        obj[header] = values[i];
      });
      this.push(JSON.stringify(obj) + '\n');
    }
    callback();
  }
}

// Usage: convert CSV file to JSON
pipeline(
  fs.createReadStream('data.csv'),
  new CSVtoJSON(),
  fs.createWriteStream('data.jsonl'),
  (err) => {
    if (err) console.error('Pipeline failed:', err);
    else console.log('CSV converted to JSON');
  }
);

Backpressure and Flow Control

Backpressure occurs when a writable stream cannot keep up with the readable stream. Proper handling prevents memory exhaustion:

const fs = require('fs');

// BAD: Ignoring backpressure
function badCopy(src, dest) {
  const readStream = fs.createReadStream(src);
  const writeStream = fs.createWriteStream(dest);

  readStream.on('data', (chunk) => {
    // write() returns false when buffer is full
    // but we're ignoring it — memory will grow!
    writeStream.write(chunk);
  });
}

// GOOD: Handling backpressure manually
function goodCopy(src, dest) {
  const readStream = fs.createReadStream(src);
  const writeStream = fs.createWriteStream(dest);

  readStream.on('data', (chunk) => {
    const canContinue = writeStream.write(chunk);
    if (!canContinue) {
      readStream.pause(); // Stop reading until drain
    }
  });

  writeStream.on('drain', () => {
    readStream.resume(); // Buffer drained, continue reading
  });

  readStream.on('end', () => writeStream.end());
}

// BEST: Use pipeline — handles backpressure automatically
const { pipeline } = require('stream/promises');

async function bestCopy(src, dest) {
  await pipeline(
    fs.createReadStream(src),
    fs.createWriteStream(dest)
  );
}

Real-World Stream Patterns

const { Transform, pipeline } = require('stream');
const { createReadStream, createWriteStream } = require('fs');
const { createGzip } = require('zlib');

// Pattern 1: Process large JSON file line by line
async function processLargeJSONL(filePath) {
  const stream = createReadStream(filePath, { encoding: 'utf8' });
  let count = 0;

  for await (const chunk of stream) {
    const lines = chunk.split('\n').filter(Boolean);
    for (const line of lines) {
      const record = JSON.parse(line);
      count++;
      // Process each record
    }
  }
  console.log(`Processed ${count} records`);
}

// Pattern 2: Streaming HTTP response
const http = require('http');
http.createServer((req, res) => {
  if (req.url === '/logs') {
    res.writeHead(200, {
      'Content-Type': 'text/plain',
      'Content-Encoding': 'gzip'
    });
    // Stream and compress log file
    pipeline(
      createReadStream('/var/log/app.log'),
      createGzip(),
      res
    ).catch(console.error);
  }
}).listen(3000);

// Pattern 3: Rate-limited stream
class ThrottleStream extends Transform {
  constructor(bytesPerSecond) {
    super();
    this.bytesPerSecond = bytesPerSecond;
    this.transferred = 0;
    this.startTime = Date.now();
  }

  _transform(chunk, encoding, callback) {
    this.transferred += chunk.length;
    const elapsed = (Date.now() - this.startTime) / 1000;
    const expectedTime = this.transferred / this.bytesPerSecond;
    const delay = Math.max(0, (expectedTime - elapsed) * 1000);

    setTimeout(() => {
      this.push(chunk);
      callback();
    }, delay);
  }
}

// Limit download to 1MB/s
pipeline(
  createReadStream('large-file.zip'),
  new ThrottleStream(1024 * 1024),
  res
);

Duplex Streams and Passthrough

const { Duplex, PassThrough } = require('stream');

// Duplex: independent read and write sides
class EchoStream extends Duplex {
  constructor() {
    super();
    this.data = [];
  }

  _write(chunk, encoding, callback) {
    // Store incoming data
    this.data.push(chunk.toString().toUpperCase());
    callback();
  }

  _read() {
    if (this.data.length > 0) {
      this.push(this.data.shift());
    } else {
      // Will be called again when more data is written
      setTimeout(() => this._read(), 100);
    }
  }
}

// PassThrough: useful for tapping into pipelines
const monitor = new PassThrough();
let totalBytes = 0;

monitor.on('data', (chunk) => {
  totalBytes += chunk.length;
  process.stdout.write(`\rTransferred: ${(totalBytes / 1024 / 1024).toFixed(2)} MB`);
});

pipeline(
  createReadStream('file.dat'),
  monitor,
  createWriteStream('copy.dat')
);

💡 Key Takeaways

  • • Always use pipeline() instead of .pipe() for proper error handling
  • • Handle backpressure to prevent memory exhaustion
  • • Use objectMode: true when streaming objects instead of buffers
  • • Use async iterators (for await) for cleaner stream consumption
  • • Transform streams are ideal for data processing pipelines

Continue Learning