Mastering Node.js Streams: From Novice to Ninja
Hey there, Node.js enthusiasts! Ready to dive deep into the world of streams? Buckle up, because we're about to embark on a journey that'll transform you from a stream novice to a data-flow master.
What You'll Learn
Streams 101: The basics and why they're a game-changer
Stream Types: Readable, Writable, Duplex, and Transform streams in action
Advanced Concepts: Backpressure, piping, and error handling
Real-World Applications: Building robust, scalable systems with streams
Performance Optimization: Tips and tricks to squeeze every bit of performance
Let's jump in!
1. Streams 101: The Basics
Remember the last time you tried to read a massive JSON file and your app crashed? Yeah, that's where streams come to the rescue. Instead of gulping down data all at once, streams let you sip it bit by bit.
Here's a quick comparison:
import { createReadStream } from 'fs';
import { parse } from 'JSONStream';
import { pipeline } from 'stream/promises';
// Function to read and parse a JSON file using streams
async function readJsonFile(filePath: string): Promise<any[]> {
const jsonData: any[] = [];
try {
// Use pipeline for better stream handling and automatic cleanup
await pipeline(
createReadStream(filePath),
parse('*'), // Parse all top-level objects/arrays in the JSON
async function* (source) {
for await (const chunk of source) {
jsonData.push(chunk);
yield chunk; // Optional: pass data downstream if needed
}
}
);
return jsonData;
} catch (error) {
console.error('Error reading file:', error);
throw error; // Re-throw to allow calling code to handle the error
}
}
// Main function to orchestrate the file reading process
async function main() {
try {
const filePath = 'massive.json';
const jsonData = await readJsonFile(filePath);
console.log(jsonData); // Process or output the parsed JSON data
} catch (error) {
console.error('Error in main:', error);
}
}
// Entry point of the script
main();
The stream approach shines with large files. It starts processing immediately and uses less memory. Plus, you can handle errors at each step. Win-win!
2. Stream Types: The Fantastic Four
Readable Streams: The Data Providers
Imagine you're building a real-time stock ticker. Here's how you'd use a Readable stream:
import { Readable } from 'stream';
// Custom Readable stream for simulating real-time stock updates
class StockTicker extends Readable {
private symbols: string[];
constructor(symbols: string[]) {
super({ objectMode: true }); // Enable object mode for non-binary data
this.symbols = symbols;
}
// Implementation of the _read method required by Readable
_read(): void {
if (this.symbols.length === 0) {
this.push(null); // End of stream
return;
}
const symbol = this.symbols.shift()!; // Non-null assertion as we've checked length
const price = (Math.random() * 100).toFixed(2);
const data = { symbol, price: parseFloat(price) };
this.push(data); // Push object instead of string for better parsing
// Simulate real-time delay
setTimeout(() => this._read(), 1000);
}
}
// Function to format and log stock data
const logStockData = (data: { symbol: string; price: number }): void => {
console.log(`${data.symbol}: $${data.price.toFixed(2)}`);
};
// Main function to run the stock ticker
async function main(): Promise<void> {
const ticker = new StockTicker(['AAPL', 'GOOGL', 'MSFT']);
// Use for-await loop for async iteration over the stream
for await (const data of ticker) {
logStockData(data as { symbol: string; price: number });
}
}
// Run the main function
main().catch(console.error);
This StockTicker simulates real-time data flow. It's perfect for scenarios where you're dealing with continuous data streams.
Writable Streams: The Data Consumers
Let's build a simple logger using a Writable stream:
import { Writable, WritableOptions } from 'stream';
import { createWriteStream, WriteStream } from 'fs';
// Custom Writable stream for logging to a file
class Logger extends Writable {
private file: WriteStream;
constructor(options?: WritableOptions) {
super(options);
// Create a write stream to the log file, appending to existing content
this.file = createWriteStream('app.log', { flags: 'a' });
}
// Override _write method to handle incoming data
_write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void {
// Format the log entry with a timestamp
const log = `[${new Date().toISOString()}] ${chunk}\n`;
// Write to the file and call the callback when done
this.file.write(log, (err) => {
if (err) {
console.error('Error writing to log:', err);
}
callback();
});
}
// Implement _final for cleanup when stream is ending
_final(callback: (error?: Error | null) => void): void {
this.file.end(callback);
}
}
// Async function to demonstrate usage of the Logger
async function runLogger(): Promise<void> {
const logger = new Logger();
// Helper function to write to logger and handle backpressure
const writeLog = (message: string): Promise<void> => {
return new Promise((resolve) => {
if (!logger.write(message)) {
logger.once('drain', resolve);
} else {
process.nextTick(resolve);
}
});
};
try {
await writeLog('Application started');
await writeLog('User logged in: john_doe');
await writeLog('Application shutting down');
} catch (error) {
console.error('Error during logging:', error);
} finally {
// Ensure the logger is properly closed
await new Promise<void>((resolve) => logger.end(resolve));
}
}
// Run the logger
runLogger().catch(console.error);
This Logger class demonstrates how Writable streams can be used for tasks like logging, where you're constantly writing data to a destination.
Duplex Streams: The Two-Way Streets
Think of a chat application. Here's a simplified version using Duplex streams:
import { Duplex, DuplexOptions } from 'stream';
import { createServer, Socket } from 'net';
// Custom Duplex stream for handling chat communication
class ChatStream extends Duplex {
private socket: Socket;
constructor(socket: Socket, options?: DuplexOptions) {
super(options);
this.socket = socket;
// Handle incoming data from the socket
this.socket.on('data', (data: Buffer) => {
// Push the data into the readable side of the Duplex stream
this.push(data);
});
// Handle socket closure
this.socket.on('end', () => {
// Signal the end of the readable stream
this.push(null);
});
}
// Implementation of _read method (required by Duplex)
_read(size: number): void {
// Reading is handled by the 'data' event listener
}
// Implementation of _write method (required by Duplex)
_write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void {
// Write the data to the socket
this.socket.write(chunk, callback);
}
}
// Function to start the chat server
function startChatServer(port: number): void {
// Create a TCP server
const server = createServer((socket: Socket) => {
console.log('Client connected');
const chat = new ChatStream(socket);
// Echo back any received data
chat.pipe(chat);
chat.on('end', () => {
console.log('Client disconnected');
});
});
// Start listening on the specified port
server.listen(port, () => {
console.log(`Chat server running on port ${port}`);
});
// Handle server errors
server.on('error', (err: Error) => {
console.error('Server error:', err);
});
}
// Start the chat server on port 3000
startChatServer(3000);
This ChatStream demonstrates how Duplex streams can handle bidirectional communication, perfect for networked applications.
Transform Streams: The Data Alchemists
Let's build a stream that converts markdown to HTML on the fly:
import { Transform, TransformCallback } from 'stream';
import { marked } from 'marked';
/**
* Transforms Markdown input into HTML output.
*/
class MarkdownTransformer extends Transform {
// Use private field declaration
#markdown = '';
constructor() {
// Use object parameter with type annotation
super({ objectMode: true });
}
// Use arrow function to avoid 'this' binding issues
_transform = (
chunk: Buffer | string,
encoding: BufferEncoding,
callback: TransformCallback
): void => {
// Use template literal for string concatenation
this.#markdown += `${chunk}`;
callback();
};
// Use arrow function to avoid 'this' binding issues
_flush = (callback: TransformCallback): void => {
// Use optional chaining and nullish coalescing for safety
const html = marked(this.#markdown?.trim() ?? '');
this.push(html);
callback();
};
}
// Use async IIFE for top-level await
(async () => {
try {
// Create an instance of the transformer
const transformer = new MarkdownTransformer();
// Use pipeline from stream/promises for better error handling
const { pipeline } = await import('stream/promises');
await pipeline(process.stdin, transformer, process.stdout);
} catch (error) {
// Use type assertion for more specific error handling
console.error('Transformation error:', (error as Error).message);
}
})();
// Use console.info for informational logging
console.info('Type your markdown (Ctrl+D to end):');
This MarkdownTransformer shows how Transform streams can modify data on-the-fly, great for processing pipelines.
3. Advanced Concepts: Leveling Up
Backpressure: Don't Flood the System
Imagine you're streaming video. If you send frames faster than they can be displayed, you'll eat up memory. Here's how to handle backpressure:
import { createServer, IncomingMessage, ServerResponse } from 'http';
import { createReadStream } from 'fs';
const server = createServer();
server.on('request', (req: IncomingMessage, res: ServerResponse) => {
// Create a readable stream for the video file
const video = createReadStream('bigbuck.webm');
video.on('data', (chunk: Buffer) => {
// Attempt to write the chunk to the response
if (!res.write(chunk)) {
// If the response buffer is full, pause the video stream
video.pause();
console.log('Paused video stream due to backpressure');
}
});
res.on('drain', () => {
// When the response buffer is emptied, resume the video stream
video.resume();
console.log('Resumed video stream');
});
video.on('end', () => {
// End the response when the video stream ends
res.end();
console.log('Video stream ended');
});
video.on('error', (err: Error) => {
console.error('Video stream error:', err);
res.statusCode = 500;
res.end('Internal Server Error');
});
});
// Use async IIFE to allow top-level await
(async () => {
try {
await new Promise<void>((resolve) => server.listen(3000, resolve));
console.log('Server running on port 3000');
} catch (error) {
console.error('Server failed to start:', (error as Error).message);
}
})();
This example shows how to implement backpressure handling in a video streaming scenario, ensuring smooth playback without overwhelming the client.
Piping: LEGO for Streams
Piping is like connecting LEGO blocks. You can build complex data flows easily:
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
import { scrypt, createCipheriv } from 'crypto';
import { pipeline } from 'stream/promises';
// Use async IIFE to allow top-level await
(async () => {
try {
// Create read stream from the input file
const readStream = createReadStream('bigfile.txt');
// Create gzip compression stream
const gzipStream = createGzip();
// Generate a secure key from the password
const key = await new Promise<Buffer>((resolve, reject) => {
scrypt('secret-key', 'salt', 32, (err, derivedKey) => {
if (err) reject(err);
else resolve(derivedKey);
});
});
// Generate a random initialization vector
const iv = Buffer.alloc(16, 0);
// Create encryption stream (using createCipheriv for better security)
const encryptionStream = createCipheriv('aes-256-cbc', key, iv);
// Create write stream to the output file
const writeStream = createWriteStream('bigfile.txt.gz.enc');
// Use pipeline for better error handling and cleanup
await pipeline(
readStream,
gzipStream,
encryptionStream,
writeStream
);
console.log('Pipeline succeeded');
} catch (error) {
console.error('Pipeline failed:', (error as Error).message);
}
})();
This pipeline reads a file, compresses it, encrypts it, and saves it. All in one smooth operation!
4. Real-World Applications: Streams in Action
Building a Log Analyzer
Here's a stream-based log analyzer that can handle gigabytes of logs:
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
async function analyzeLog(filename: string): Promise<void> {
// Create a readable stream from the file
const fileStream = createReadStream(filename);
// Create a readline interface
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity // Recognize all instances of CR LF as a single line break
});
// Use a Map instead of an object for better performance with large datasets
const errors = new Map<string, number>();
// Iterate over each line in the file
for await (const line of rl) {
if (line.includes('ERROR')) {
// Extract the error type from the line
const errorType = line.split('ERROR:')[1]?.trim() ?? 'Unknown Error';
// Increment the error count
errors.set(errorType, (errors.get(errorType) ?? 0) + 1);
}
}
console.log('Error Analysis:', Object.fromEntries(errors));
}
// Use an async IIFE to allow top-level await
(async () => {
try {
await analyzeLog('server.log');
} catch (error) {
console.error('Log analysis failed:', (error as Error).message);
}
})();
This analyzer can crunch through massive log files without breaking a sweat, perfect for real-world log analysis tasks.
Real-time Data Processing Pipeline
Let's build a pipeline that processes sensor data in real-time:
import { Transform, TransformCallback } from 'stream';
import { createServer, Socket } from 'net';
interface TemperatureData {
temperature: number;
[key: string]: any;
}
class TemperatureNormalizer extends Transform {
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
try {
// Parse the incoming data
const data: TemperatureData = JSON.parse(chunk.toString());
// Convert temperature to Fahrenheit
data.temperature = (data.temperature * 9/5) + 32;
// Stringify and push the transformed data
this.push(JSON.stringify(data) + '\n');
callback();
} catch (error) {
callback(error as Error);
}
}
}
class AnomalyDetector extends Transform {
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
try {
// Parse the incoming data
const data: TemperatureData = JSON.parse(chunk.toString());
// Check for high temperature
if (data.temperature > 100) {
this.push(`ALERT: High temperature detected: ${data.temperature}°F\n`);
}
callback();
} catch (error) {
callback(error as Error);
}
}
}
// Create the server
const server = createServer((socket: Socket) => {
socket
.pipe(new TemperatureNormalizer())
.pipe(new AnomalyDetector())
.pipe(process.stdout);
});
// Start the server
server.listen(3000, () => {
console.log('Server listening on port 3000');
});
// Handle server errors
server.on('error', (error: Error) => {
console.error('Server error:', error.message);
});
This setup can handle a constant stream of sensor data, normalize it, and detect anomalies in real-time. Perfect for IoT applications!
5. Performance Optimization: Squeezing Every Bit
Use Buffer pools: For frequently created buffers, use Buffer.allocUnsafe() and manage a pool.
Adjust highWaterMark: Fine-tune the buffer size for your specific use case.
Use Cork/Uncork: Batch small writes for better performance.
Here's an example of using cork and uncork:
import { createWriteStream } from 'fs';
// Create a writable stream to 'output.txt'
const writableStream = createWriteStream('output.txt');
// Cork the stream to buffer writes
writableStream.cork();
// Write data to the stream (these writes will be buffered)
writableStream.write('Hello');
writableStream.write('World');
// Schedule uncorking on the next tick of the event loop
process.nextTick(() => {
// Uncork the stream, allowing buffered data to be flushed
writableStream.uncork();
});
// Handle potential errors
writableStream.on('error', (error: Error) => {
console.error('Write stream error:', error.message);
});
// Log when the stream is finished
writableStream.on('finish', () => {
console.log('Write operation completed');
});
// Close the stream when we're done
process.on('beforeExit', () => {
writableStream.end();
});
This batches the writes, improving performance for small, frequent writes.
Wrapping Up
Streams are the secret sauce for building scalable, efficient Node.js applications. They're not just a feature; they're a mindset. Once you start thinking in streams, you'll see opportunities to use them everywhere.
Remember:
Use streams for large datasets or real-time data.
Understand backpressure to prevent memory issues.
Leverage piping for clean, efficient code.
Always handle errors in your streams.
Now go forth and stream all the things! Happy coding!
Subscribe to my newsletter
Read articles from Kalpesh Mali directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Kalpesh Mali
Kalpesh Mali
I am a developer and i write clean code