Lesson 1 of 8
5 min read
Advanced Node.js

Streams & Buffers

Handle large data efficiently with readable, writable, and transform streams

Why Streams Matter

Imagine reading a 2GB video file. Without streams, you'd need to load the entire file into memory before processing it. With streams, you process data in chunks as it arrives, using minimal memory.

Streams are fundamental to Node.js and are used everywhere: HTTP requests/responses, file operations, compression, encryption, and more.

🌊 Four Types of Streams

Readable

Source of data (fs.createReadStream, HTTP request)

Writable

Destination for data (fs.createWriteStream, HTTP response)

Duplex

Both readable and writable (TCP socket)

Transform

Modify data as it passes through (zlib, crypto)

Buffers: Binary Data Containers

Buffers are fixed-size chunks of memory used to store binary data. They're essential for working with streams, files, and network operations.

// Creating buffers
const buf1 = Buffer.alloc(10);           // 10 bytes, filled with zeros
const buf2 = Buffer.from('Hello');       // From string
const buf3 = Buffer.from([72, 101, 108]); // From array of bytes

console.log(buf2.toString());            // 'Hello'
console.log(buf2.length);                // 5 bytes
console.log(buf2[0]);                    // 72 (ASCII for 'H')

// Buffer operations
const combined = Buffer.concat([buf2, Buffer.from(' World')]);
console.log(combined.toString());        // 'Hello World'

// Convert between encodings
const base64 = buf2.toString('base64');  // 'SGVsbG8='
const hex = buf2.toString('hex');        // '48656c6c6f'

// Write to buffer
const buf = Buffer.alloc(10);
buf.write('Hi');
console.log(buf.toString());             // 'Hi'

Readable Streams

const fs = require('fs');

// Create a readable stream
const readStream = fs.createReadStream('large-file.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024  // 64KB chunks (default is 64KB)
});

// Event-based reading
readStream.on('data', (chunk) => {
  console.log('Received chunk:', chunk.length, 'bytes');
});

readStream.on('end', () => {
  console.log('Finished reading');
});

readStream.on('error', (err) => {
  console.error('Error:', err);
});

// Pause and resume
readStream.on('data', (chunk) => {
  readStream.pause();  // Stop receiving data
  
  processChunk(chunk).then(() => {
    readStream.resume();  // Continue receiving
  });
});

Writable Streams

const fs = require('fs');

// Create a writable stream
const writeStream = fs.createWriteStream('output.txt');

// Write data
writeStream.write('Hello ');
writeStream.write('World\n');
writeStream.end('Goodbye!');  // Final write and close

// Handle events
writeStream.on('finish', () => {
  console.log('All data written');
});

writeStream.on('error', (err) => {
  console.error('Write error:', err);
});

// Handle backpressure
function writeData(stream, data) {
  const canContinue = stream.write(data);
  
  if (!canContinue) {
    // Buffer is full, wait for drain
    console.log('Backpressure! Waiting...');
    stream.once('drain', () => {
      console.log('Drained, continuing...');
    });
  }
}

Piping Streams

Piping connects streams together, automatically handling backpressure and data flow.

const fs = require('fs');
const zlib = require('zlib');

// Simple pipe: copy a file
fs.createReadStream('source.txt')
  .pipe(fs.createWriteStream('destination.txt'));

// Chain multiple transforms: read, compress, write
fs.createReadStream('file.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('file.txt.gz'));

// Decompress
fs.createReadStream('file.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('file-restored.txt'));

// HTTP response streaming
const http = require('http');

http.createServer((req, res) => {
  const fileStream = fs.createReadStream('large-video.mp4');
  res.writeHead(200, { 'Content-Type': 'video/mp4' });
  fileStream.pipe(res);  // Stream video to client
}).listen(3000);

Pipeline (Modern Approach)

pipeline() is the modern, safer way to pipe streams with proper error handling.

const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');

async function compressFile(input, output) {
  try {
    await pipeline(
      fs.createReadStream(input),
      zlib.createGzip(),
      fs.createWriteStream(output)
    );
    console.log('Compression complete');
  } catch (err) {
    console.error('Pipeline failed:', err);
  }
}

compressFile('bigfile.txt', 'bigfile.txt.gz');

Transform Streams

Transform streams modify data as it passes through. Great for data processing pipelines.

const { Transform } = require('stream');

// Custom transform: uppercase converter
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    const upperCased = chunk.toString().toUpperCase();
    callback(null, upperCased);
  }
});

// Use it
process.stdin
  .pipe(upperCaseTransform)
  .pipe(process.stdout);

// Line counter transform
const lineCounter = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n').length - 1;
    this.lineCount = (this.lineCount || 0) + lines;
    callback(null, chunk);
  },
  flush(callback) {
    console.log('Total lines:', this.lineCount);
    callback();
  }
});

// JSON parser transform
const jsonParser = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    try {
      const obj = JSON.parse(chunk.toString());
      callback(null, obj);
    } catch (err) {
      callback(err);
    }
  }
});

Practical Example: CSV Processor

const { Transform, pipeline } = require('stream');
const fs = require('fs');

// Transform CSV lines to JSON objects
class CSVParser extends Transform {
  constructor(options = {}) {
    super({ ...options, objectMode: true });
    this.headers = null;
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop();  // Keep incomplete line

    for (const line of lines) {
      if (!line.trim()) continue;
      
      const values = line.split(',');
      
      if (!this.headers) {
        this.headers = values;
        continue;
      }

      const obj = {};
      this.headers.forEach((header, i) => {
        obj[header.trim()] = values[i]?.trim();
      });
      
      this.push(obj);
    }
    callback();
  }
}

// Filter transform
class FilterTransform extends Transform {
  constructor(filterFn, options = {}) {
    super({ ...options, objectMode: true });
    this.filterFn = filterFn;
  }

  _transform(obj, encoding, callback) {
    if (this.filterFn(obj)) {
      this.push(obj);
    }
    callback();
  }
}

// Use the pipeline
async function processCSV() {
  const parser = new CSVParser();
  const filter = new FilterTransform(row => row.age > 18);
  const output = fs.createWriteStream('adults.json');

  await pipeline(
    fs.createReadStream('users.csv'),
    parser,
    filter,
    new Transform({
      objectMode: true,
      transform(obj, enc, cb) {
        cb(null, JSON.stringify(obj) + '\n');
      }
    }),
    output
  );

  console.log('Processing complete');
}

processCSV();

Stream Performance Comparison

Approach 1GB File Memory Time
fs.readFile() (load all) ~1GB RAM Slower start
Streams (64KB chunks) ~64KB RAM Immediate start

💡 Best Practices

  • • Use pipeline() instead of .pipe() for proper error handling
  • • Always handle the 'error' event on streams
  • • Use highWaterMark to tune buffer size for your use case
  • • Consider objectMode: true for non-binary data
  • • Use streams for any file larger than a few MB

Continue Learning