Understanding Streams
Streams are the backbone of Node.js I/O. They enable you to process data piece by piece instead of loading everything into memory. Every HTTP request, file operation, and network socket in Node.js is a stream under the hood.
🌊 Stream Types
Data source: fs.createReadStream, http request, process.stdin
Data sink: fs.createWriteStream, http response, process.stdout
Both read & write: TCP socket, WebSocket
Modify data in transit: zlib, crypto cipher, CSV parser
Creating Custom Readable Streams
const { Readable } = require('stream');
// Generate data on demand
class CounterStream extends Readable {
constructor(max, options) {
super(options);
this.max = max;
this.current = 0;
}
_read() {
if (this.current <= this.max) {
const data = `Number: ${this.current}\n`;
this.push(data);
this.current++;
} else {
this.push(null); // Signal end of stream
}
}
}
const counter = new CounterStream(1000);
counter.pipe(process.stdout);
// Simpler: Readable.from() for iterables
async function* generateData() {
for (let i = 0; i < 100; i++) {
yield JSON.stringify({ id: i, timestamp: Date.now() }) + '\n';
await new Promise(r => setTimeout(r, 10));
}
}
const stream = Readable.from(generateData());
stream.pipe(process.stdout);
Creating Custom Transform Streams
const { Transform, pipeline } = require('stream');
const fs = require('fs');
// Transform: Convert CSV to JSON
class CSVtoJSON extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.headers = null;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // Keep incomplete line
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',').map(v => v.trim());
if (!this.headers) {
this.headers = values;
continue;
}
const obj = {};
this.headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(JSON.stringify(obj) + '\n');
}
callback();
}
_flush(callback) {
// Handle any remaining data
if (this.buffer.trim() && this.headers) {
const values = this.buffer.split(',').map(v => v.trim());
const obj = {};
this.headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(JSON.stringify(obj) + '\n');
}
callback();
}
}
// Usage: convert CSV file to JSON
pipeline(
fs.createReadStream('data.csv'),
new CSVtoJSON(),
fs.createWriteStream('data.jsonl'),
(err) => {
if (err) console.error('Pipeline failed:', err);
else console.log('CSV converted to JSON');
}
);
Backpressure and Flow Control
Backpressure occurs when a writable stream cannot keep up with the readable stream. Proper handling prevents memory exhaustion:
const fs = require('fs');
// BAD: Ignoring backpressure
function badCopy(src, dest) {
const readStream = fs.createReadStream(src);
const writeStream = fs.createWriteStream(dest);
readStream.on('data', (chunk) => {
// write() returns false when buffer is full
// but we're ignoring it — memory will grow!
writeStream.write(chunk);
});
}
// GOOD: Handling backpressure manually
function goodCopy(src, dest) {
const readStream = fs.createReadStream(src);
const writeStream = fs.createWriteStream(dest);
readStream.on('data', (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
readStream.pause(); // Stop reading until drain
}
});
writeStream.on('drain', () => {
readStream.resume(); // Buffer drained, continue reading
});
readStream.on('end', () => writeStream.end());
}
// BEST: Use pipeline — handles backpressure automatically
const { pipeline } = require('stream/promises');
async function bestCopy(src, dest) {
await pipeline(
fs.createReadStream(src),
fs.createWriteStream(dest)
);
}
Real-World Stream Patterns
const { Transform, pipeline } = require('stream');
const { createReadStream, createWriteStream } = require('fs');
const { createGzip } = require('zlib');
// Pattern 1: Process large JSON file line by line
async function processLargeJSONL(filePath) {
const stream = createReadStream(filePath, { encoding: 'utf8' });
let count = 0;
for await (const chunk of stream) {
const lines = chunk.split('\n').filter(Boolean);
for (const line of lines) {
const record = JSON.parse(line);
count++;
// Process each record
}
}
console.log(`Processed ${count} records`);
}
// Pattern 2: Streaming HTTP response
const http = require('http');
http.createServer((req, res) => {
if (req.url === '/logs') {
res.writeHead(200, {
'Content-Type': 'text/plain',
'Content-Encoding': 'gzip'
});
// Stream and compress log file
pipeline(
createReadStream('/var/log/app.log'),
createGzip(),
res
).catch(console.error);
}
}).listen(3000);
// Pattern 3: Rate-limited stream
class ThrottleStream extends Transform {
constructor(bytesPerSecond) {
super();
this.bytesPerSecond = bytesPerSecond;
this.transferred = 0;
this.startTime = Date.now();
}
_transform(chunk, encoding, callback) {
this.transferred += chunk.length;
const elapsed = (Date.now() - this.startTime) / 1000;
const expectedTime = this.transferred / this.bytesPerSecond;
const delay = Math.max(0, (expectedTime - elapsed) * 1000);
setTimeout(() => {
this.push(chunk);
callback();
}, delay);
}
}
// Limit download to 1MB/s
pipeline(
createReadStream('large-file.zip'),
new ThrottleStream(1024 * 1024),
res
);
Duplex Streams and Passthrough
const { Duplex, PassThrough } = require('stream');
// Duplex: independent read and write sides
class EchoStream extends Duplex {
constructor() {
super();
this.data = [];
}
_write(chunk, encoding, callback) {
// Store incoming data
this.data.push(chunk.toString().toUpperCase());
callback();
}
_read() {
if (this.data.length > 0) {
this.push(this.data.shift());
} else {
// Will be called again when more data is written
setTimeout(() => this._read(), 100);
}
}
}
// PassThrough: useful for tapping into pipelines
const monitor = new PassThrough();
let totalBytes = 0;
monitor.on('data', (chunk) => {
totalBytes += chunk.length;
process.stdout.write(`\rTransferred: ${(totalBytes / 1024 / 1024).toFixed(2)} MB`);
});
pipeline(
createReadStream('file.dat'),
monitor,
createWriteStream('copy.dat')
);
💡 Key Takeaways
- • Always use
pipeline()instead of.pipe()for proper error handling - • Handle backpressure to prevent memory exhaustion
- • Use
objectMode: truewhen streaming objects instead of buffers - • Use async iterators (
for await) for cleaner stream consumption - • Transform streams are ideal for data processing pipelines