Node.js Streams and its Type , Back Pressure, Pipe and Pipeline in Nodejs
Node.js Streams
Node.js Streams are incredibly powerful and are fundamental building-block of efficient and fast data-processing and transformation application. When working with large amount of data or data that’s coming from an external source one chunk at a time. streams provide a way to read or write data in chunks, making it possible to work with large amounts of data without loading the entire dataset into memory. This is particularly useful when dealing with files, network communication, or any other I/O operations.
Streams helps to control or handle Back Pressure.
What is backpressure.
Lets understand back pressure by example
Transformation of data from HDD to SSD is Normal . The flow of data is smooth and efficient. If you transfer data from SSD to HDD, there may be pressure to HDD because SSD has higher speed and HDD is slower than SSD. It may cause memory leak and data corruption.
This is actually Backpressure. In order to handle this situation, streams are introduced. Streams converts data into small size chunk of buffer and transfer or manipulate data.
Node.js has several types of streams, and they can be categorized into four main types:
Readable Streams
Writable Streams
Transform Streams
Duplex Streams
Readable Streams
These streams represent a source of data that you can read from. Examples include reading data from a file, receiving data from a network, or reading from a buffer.
Lets learn using example
I have salaries.csv that contain information of employee like Name,Age,Job, Salary. Now we can read that data using readStream .
const fs=require('fs');
const main=async()=>{
const readStream= fs.createReadStream('./salaries.csv',{highWaterMark:100})
readStream.on('data',(buffer)=>{
console.log("data>>>",buffer.toString());
})
readStream.on('end',()=>{
console.log("Stream Ended")
})
}
main();
OUTPUT
$ node readStream.js data>>> Name,Age,Job,Salary Kevin Sanders,24,Software Engineer,7300 Lisa Mills,26,High School Teacher,6100 Donna Allison,27,Dentist,12700 Michael Schmitt,43,Dentist,17500 Lisa Shaffer,31,Accountant,7400 Jeffrey Heath,25,Dentist,11600
Writable Streams
These streams represent a destination to which you can write data. Examples include writing data to a file, sending data over a network, or writing to a buffer.
Lets learn using example
I have salaries.csv that contain information of employee like Name,Age,Job, Salary. Now we can now move data and write to destination file.const fs=require('fs'); const main=async()=>{ const readStream= fs.createReadStream('./salaries.csv'); const writeStream=fs.createWriteStream('./salary.csv'); readStream.on('data',(buffer)=>{ console.log("data>>>",buffer.toString()); writeStream.write(buffer); }) readStream.on('end',()=>{ console.log("Stream Ended") }) } main();
Output in another salary.scv file
Name,Age,Job,Salary
Kevin Sanders,24,Software Engineer,7300 Lisa Mills,26,High School Teacher,6100 Donna Allison,27,Dentist,12700 Michael Schmitt,43,Dentist,17500 Lisa Shaffer,31,Accountant,7400Transform Streams
These are a special type of duplex stream where the output is computed based on the input. Transform streams are often used to modify or transform data as it is being read from a source and written to a destination.
```javascript const fs=require('fs'); const csv= require('csvtojson'); const{Transform}= require('node:stream'); const {pipeline}= require('stream/promises');
const main=async()=>{ const readStream= fs.createReadStream('./salaries.csv');
const myTransform=new Transform( { objectMode:true, transform(chunck,enc,callback){ const user={ Name:chunck.Name, Age:Number(chunck.Age), Job:chunck.Job, Salary:chunck.Salary } console.log("user===",user)
callback(null,user); },
} )
const convertToJson= new Transform({ objectMode:true, transform(user,enc,callback){ const value= JSON.stringify(user) +'\n'; callback(null,value);
}, }) try { await pipeline( readStream, csv({ delimiter:',', },{objectMode:true}), myTransform, convertToJson, fs.createWriteStream('./salary.ndjson')
) } catch (error) { console.error("some stream error=",error) }
} main();
Here, the `fs` module is used for file system operations, `csvtojson` is used to convert CSV data to JSON, `Transform` is a class from the `stream` module for creating custom transform streams, and `pipeline` is used to simplify the process of piping streams together as below.
```javascript
const fs=require('fs');
const csv= require('csvtojson');
const{Transform}= require('node:stream');
const {pipeline}= require('stream/promises');
Creating Readable Stream
const readStream= fs.createReadStream('./salaries.csv');
This line creates a readable stream from a CSV file (salaries.csv
).
const myTransform=new Transform(
{
objectMode:true,
transform(chunck,enc,callback){
const user={
Name:chunck.Name,
Age:Number(chunck.Age),
Job:chunck.Job,
Salary:chunck.Salary
}
console.log("user===",user)
callback(null,user);
},
}
)
This transform stream (myTransform
) takes CSV data chunks, converts them into a user object, and logs the user object.
Another Transform Stream (convertToJson
):
const convertToJson= new Transform({
objectMode:true,
transform(user,enc,callback){
const value= JSON.stringify(user) +'\n';
callback(null,value);
},
})
This transform stream (convertToJson
) takes user objects and converts them into JSON strings.
Using pipeline
to Combine Streams:
try {
await pipeline(
readStream,
csv({
delimiter:',',
},{objectMode:true}),
myTransform,
convertToJson,
fs.createWriteStream('./salary.ndjson')
)
} catch (error) {
console.error("some stream error=",error)
}
The pipeline
function is used to combine multiple streams (readStream
, CSV parser, myTransform
, convertToJson
) into a pipeline, and the result is written to a new file (salary.ndjson
). If any error occurs during the pipeline, it is caught and logged.
Here is complete code
const fs=require('fs');
const csv= require('csvtojson');
const{Transform}= require('node:stream');
const {pipeline}= require('stream/promises');
const main=async()=>{
const readStream= fs.createReadStream('./salaries.csv');
const myTransform=new Transform(
{
objectMode:true,
transform(chunck,enc,callback){
const user={
Name:chunck.Name,
Age:Number(chunck.Age),
Job:chunck.Job,
Salary:chunck.Salary
}
console.log("user===",user)
callback(null,user);
},
}
)
const convertToJson= new Transform({
objectMode:true,
transform(user,enc,callback){
const value= JSON.stringify(user) +'\n';
callback(null,value);
},
})
try {
await pipeline(
readStream,
csv({
delimiter:',',
},{objectMode:true}),
myTransform,
convertToJson,
fs.createWriteStream('./salary.ndjson')
)
} catch (error) {
console.error("some stream error=",error)
}
}
main();
In this example, a readable stream generates CSV-like data, a transform stream parses it into user objects, another transform stream converts them to JSON strings, and finally, the data is written to the console. The pipeline
function handles error propagation and cleanup.
Subscribe to my newsletter
Read articles from Bhanubhakta Regmi directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by