Why Streams Matter
Imagine reading a 2GB video file. Without streams, you'd need to load the entire file into memory before processing it. With streams, you process data in chunks as it arrives, using minimal memory.
Streams are fundamental to Node.js and are used everywhere: HTTP requests/responses, file operations, compression, encryption, and more.
🌊 Four Types of Streams
Source of data (fs.createReadStream, HTTP request)
Destination for data (fs.createWriteStream, HTTP response)
Both readable and writable (TCP socket)
Modify data as it passes through (zlib, crypto)
Buffers: Binary Data Containers
Buffers are fixed-size chunks of memory used to store binary data. They're essential for working with streams, files, and network operations.
// Creating buffers
const buf1 = Buffer.alloc(10); // 10 bytes, filled with zeros
const buf2 = Buffer.from('Hello'); // From string
const buf3 = Buffer.from([72, 101, 108]); // From array of bytes
console.log(buf2.toString()); // 'Hello'
console.log(buf2.length); // 5 bytes
console.log(buf2[0]); // 72 (ASCII for 'H')
// Buffer operations
const combined = Buffer.concat([buf2, Buffer.from(' World')]);
console.log(combined.toString()); // 'Hello World'
// Convert between encodings
const base64 = buf2.toString('base64'); // 'SGVsbG8='
const hex = buf2.toString('hex'); // '48656c6c6f'
// Write to buffer
const buf = Buffer.alloc(10);
buf.write('Hi');
console.log(buf.toString()); // 'Hi'
Readable Streams
const fs = require('fs');
// Create a readable stream
const readStream = fs.createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks (default is 64KB)
});
// Event-based reading
readStream.on('data', (chunk) => {
console.log('Received chunk:', chunk.length, 'bytes');
});
readStream.on('end', () => {
console.log('Finished reading');
});
readStream.on('error', (err) => {
console.error('Error:', err);
});
// Pause and resume
readStream.on('data', (chunk) => {
readStream.pause(); // Stop receiving data
processChunk(chunk).then(() => {
readStream.resume(); // Continue receiving
});
});
Writable Streams
const fs = require('fs');
// Create a writable stream
const writeStream = fs.createWriteStream('output.txt');
// Write data
writeStream.write('Hello ');
writeStream.write('World\n');
writeStream.end('Goodbye!'); // Final write and close
// Handle events
writeStream.on('finish', () => {
console.log('All data written');
});
writeStream.on('error', (err) => {
console.error('Write error:', err);
});
// Handle backpressure
function writeData(stream, data) {
const canContinue = stream.write(data);
if (!canContinue) {
// Buffer is full, wait for drain
console.log('Backpressure! Waiting...');
stream.once('drain', () => {
console.log('Drained, continuing...');
});
}
}
Piping Streams
Piping connects streams together, automatically handling backpressure and data flow.
const fs = require('fs');
const zlib = require('zlib');
// Simple pipe: copy a file
fs.createReadStream('source.txt')
.pipe(fs.createWriteStream('destination.txt'));
// Chain multiple transforms: read, compress, write
fs.createReadStream('file.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('file.txt.gz'));
// Decompress
fs.createReadStream('file.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('file-restored.txt'));
// HTTP response streaming
const http = require('http');
http.createServer((req, res) => {
const fileStream = fs.createReadStream('large-video.mp4');
res.writeHead(200, { 'Content-Type': 'video/mp4' });
fileStream.pipe(res); // Stream video to client
}).listen(3000);
Pipeline (Modern Approach)
pipeline() is the modern, safer way to pipe streams with proper error handling.
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');
async function compressFile(input, output) {
try {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('Compression complete');
} catch (err) {
console.error('Pipeline failed:', err);
}
}
compressFile('bigfile.txt', 'bigfile.txt.gz');
Transform Streams
Transform streams modify data as it passes through. Great for data processing pipelines.
const { Transform } = require('stream');
// Custom transform: uppercase converter
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
const upperCased = chunk.toString().toUpperCase();
callback(null, upperCased);
}
});
// Use it
process.stdin
.pipe(upperCaseTransform)
.pipe(process.stdout);
// Line counter transform
const lineCounter = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n').length - 1;
this.lineCount = (this.lineCount || 0) + lines;
callback(null, chunk);
},
flush(callback) {
console.log('Total lines:', this.lineCount);
callback();
}
});
// JSON parser transform
const jsonParser = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
const obj = JSON.parse(chunk.toString());
callback(null, obj);
} catch (err) {
callback(err);
}
}
});
Practical Example: CSV Processor
const { Transform, pipeline } = require('stream');
const fs = require('fs');
// Transform CSV lines to JSON objects
class CSVParser extends Transform {
constructor(options = {}) {
super({ ...options, objectMode: true });
this.headers = null;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // Keep incomplete line
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',');
if (!this.headers) {
this.headers = values;
continue;
}
const obj = {};
this.headers.forEach((header, i) => {
obj[header.trim()] = values[i]?.trim();
});
this.push(obj);
}
callback();
}
}
// Filter transform
class FilterTransform extends Transform {
constructor(filterFn, options = {}) {
super({ ...options, objectMode: true });
this.filterFn = filterFn;
}
_transform(obj, encoding, callback) {
if (this.filterFn(obj)) {
this.push(obj);
}
callback();
}
}
// Use the pipeline
async function processCSV() {
const parser = new CSVParser();
const filter = new FilterTransform(row => row.age > 18);
const output = fs.createWriteStream('adults.json');
await pipeline(
fs.createReadStream('users.csv'),
parser,
filter,
new Transform({
objectMode: true,
transform(obj, enc, cb) {
cb(null, JSON.stringify(obj) + '\n');
}
}),
output
);
console.log('Processing complete');
}
processCSV();
Stream Performance Comparison
| Approach | 1GB File Memory | Time |
|---|---|---|
| fs.readFile() (load all) | ~1GB RAM | Slower start |
| Streams (64KB chunks) | ~64KB RAM | Immediate start |
💡 Best Practices
- • Use
pipeline()instead of.pipe()for proper error handling - • Always handle the 'error' event on streams
- • Use
highWaterMarkto tune buffer size for your use case - • Consider
objectMode: truefor non-binary data - • Use streams for any file larger than a few MB