Nodejs Stream 的基礎概念
大綱
本文主要會說明 Nodejs 中,透過 stream api 可以用達成以下好處
- 可以比較有效率的處理大型資料,避免大型檔案遭成程序阻塞
- 可以透過有限度的資源處理相同量級資料
What is Stream?
Stream 代表一個資料流。
通常是指處理大型資料把切分為一堆堆小型區塊資量逐步處理的方式
Why use Stream?
節省資源(記憶體使用)
透過 Buffer 可以效率處理資料 I/O
Stream 類別
根據資料的流向以及處理方式可以分為以下四類 Data Stream
1. Writable Stream
Writable Stream 是一種用來處理寫把資料輸出的 Stream 物件
具有一個屬性叫作 writableHighWaterMark ,代表累積多少資料才做輸出
writableHighWaterMark 是 16384
內部具有一個預設的 Buffer 結構,預設大小是 writableHighWaterMark Bytes 約為 16MB,用來對寫入資料做暫存
可以透過對 Writable Stream 做 write 方法來把資料寫入 Internal Buffer
直到資料到達 writableHighWaterMark 才做寫入
但當資料超過 writableHighWaterMark 時,超過的部份就會直接被加入記憶體內
造成 Nodejs 記憶體使用量超過原本預期。直到 Nodejs 程序把 Internal Buffer的資料處理完。
比較好的處理方式是透過判斷寫入時判斷回傳值
stream.write 如果回傳值是 true 代表 Internal buffer 還有空間可以做寫入
stream.write 如果回傳值是 false 代表 Internal buffer 還沒有空間可以做寫入 需要花時間處理
當遇到 false 時,可以使用 stream.pause 來暫時停止寫入資料
直到 stream 發出 drain 事件代表,已經處理完 Buffer 內資料,則可以透過 stream.resume 繼續往下處理資料
範例如下:
import * as fs from 'fs/promises';
import * as path from 'path';
(async () => {
console.time("writeMany");
const fileHandle = await fs.open(path.join(__dirname, "../test.txt"), "w");
const stream = fileHandle.createWriteStream();
console.log(stream.writableHighWaterMark);
let i = 0;
const numberOfWrites = 10000000;
const writeMany = () => {
while ( i < numberOfWrites) {
const buff = Buffer.from(` ${i} `, 'utf-8');
// last write
if (i === numberOfWrites -1) {
return stream.end(buff);
}
// if stream.write return false stop the loop
if (!stream.write(buff)) {
break;
}
i++;
}
}
writeMany();
// resume our loop once our stream's internal buffer is empty
stream.on('drain', () => {
writeMany();
});
stream.on('finish', () =>{
console.timeEnd("writeMany");
fileHandle.close();
})
})()
2. Readable Stream
Readable Stream 是一種用來處理寫把資料輸入的 Stream 物件
具有一個屬性叫作 highWaterMark ,代表累積多少資料才做輸入
highWaterMark 是 65536
內部具有一個預設的 Buffer 結構,預設大小是 highWaterMark Bytes 約為 64MB,用來對輸出資料做暫存
可以透過對 Readable Stream 做 push 或是 end 方法來把資料寫入 Internal Buffer
直到資料到達 highWaterMark 或是讀取到 EOF 可以從 stream.on('data') 事件的讀出輸入的部份資料
範例如下
import * as fs from 'node:fs/promises';
import * as path from 'path';
(async () => {
console.time('readBig');
const fileHandleRead = await fs.open(path.join(__dirname, "../src.txt"), "r");
const fileHandleWrite = await fs.open(path.join(__dirname, "../dest.txt"), "w");
const streamRead = fileHandleRead.createReadStream({ highWaterMark: 64 * 1024});
const streamWrite = fileHandleWrite.createWriteStream();
let split = '';
streamRead.on('data', (chunk) => {
const numbers = chunk.toString('utf-8').split(' ');
if (Number(numbers[0]) !== Number(numbers[1]) -1) {
if (split) numbers[0] = split + numbers[0].trim();
}
if (Number(numbers[numbers.length - 2]) + 1 !== Number(numbers[numbers.length - 1]) ) {
split = numbers.pop();
}
numbers.forEach((number) => {
let n = Number(number);
if (n%10 == 0) {
if (!streamWrite.write(" "+n+" ")) {
streamRead.pause();
}
}
});
});
streamWrite.on("drain", () => {
streamRead.resume();
});
streamRead.on('end', () => {
console.log("Done Reading");
console.timeEnd('readBig');
});
})();
2.1 把 Writable Stream 與 Readable Stream 串接起來
- 透過 pipe 運算
透過 pipe 運算可以把 Readable Strream 串接到 Writable Stream
範例如下
import { pipeline } from 'node:stream';
import * as fs from 'node:fs/promises';
import * as path from 'path';
(async () => {
console.time("copyStream");
const srcFile = await fs.open(path.join(__dirname, '../dest.txt'), 'r');
const destFile = await fs.open(path.join(__dirname, '../dest-copy-stream.txt'), 'w');
const readStream = srcFile.createReadStream();
const writeStream = destFile.createWriteStream();
readStream.pipe(writeStream);
readStream.on('end', () => {
console.timeEnd("copyStream");
});
})();
然而 pipe 運算並不是一種好的作法
主要原因是沒有良好的錯誤處理接口
需要額外針對兩個 Stream 各自做錯誤事件的監聽並且對錯誤的 Stream 做 close
取而代之可以使用 pipeline 來做串接
範例如下:
import { pipeline } from 'node:stream';
import * as fs from 'node:fs/promises';
import * as path from 'path';
(async () => {
console.time("copyStream");
const srcFile = await fs.open(path.join(__dirname, '../dest.txt'), 'r');
const destFile = await fs.open(path.join(__dirname, '../dest-copy-stream.txt'), 'w');
const readStream = srcFile.createReadStream();
const writeStream = destFile.createWriteStream();
pipeline(readStream, writeStream, (err) => {
console.log(err);
console.timeEnd("copyStream");
})
// readStream.pipe(writeStream);
// readStream.on('end', () => {
// console.timeEnd("copyStream");
// });
})();
3. Duplex Stream
Duplex 是一種同時具有 Writable Stream 與 Readable Stream 的 Stream 物件
同時具有 Writable 與 Readable Stream 的屬性
同時具有 Writable 與 Readable Stream 的 Buffer
可以分開對不同資料流做讀以及寫入
範例如下
import { Duplex } from 'node:stream';
import * as fs from 'node:fs';
import * as path from 'node:path';
class DuplexStream extends Duplex {
private readFileName: string = '';
private writeFileName: string = '';
private readFd: number = -1;
private writeFd: number = -1;
private chunks: Uint8Array[] = [];
private chunksSize = 0;
constructor({ writableHighWaterMark, readableHighWaterMark, readFileName, writeFileName }:
{ writableHighWaterMark?: number, readableHighWaterMark?: number, readFileName: string, writeFileName: string }) {
super({writableHighWaterMark, readableHighWaterMark })
this.readFileName = readFileName;
this.writeFileName = writeFileName;
}
_construct(callback: (error?: Error) => void): void {
fs.open(this.readFileName, 'r', (err, readFd) => {
if (err) return callback(err);
this.readFd = readFd;
fs.open(this.writeFileName, 'w', (err, writeFd) => {
if (err) return callback(err);
this.writeFd = writeFd;
callback();
});
});
}
_write(chunk: Buffer, encoding: BufferEncoding, callback: (error?: Error) => void): void {
this.chunks.push(chunk);
this.chunksSize += chunk.length;
// do our write operation...
if (this.chunksSize > this.writableHighWaterMark) {
fs.write(this.writeFd, Buffer.concat(this.chunks), (err) => {
if (err) {
return callback(err);
}
this.chunks = [];
this.chunksSize = 0;
callback();
});
} else {
// when we're done, we should call the callback function
callback();
}
}
_read(size: number): void {
const buff = Buffer.alloc(size);
fs.read(this.readFd, buff, 0, size, null, (err, bytesRead) => {
if (err) return this.destroy(err);
// nul is to indicate the end of stream
this.push(bytesRead > 0 ? buff.subarray(0, bytesRead): null);
});
}
_final(callback: (error?: Error) => void): void {
fs.write(this.writeFd, Buffer.concat(this.chunks), (err) => {
if (err) return callback(err);
this.chunks = [];
callback();
});
}
_destroy(error: Error, callback: (error: Error) => void): void {
callback(error);
}
}
const duplex = new DuplexStream({
readFileName: path.join(__dirname, '../read.txt'),
writeFileName: path.join(__dirname, '../write.txt') });
duplex.write(Buffer.from("this is a string 0\n"));
duplex.write(Buffer.from("this is a string 1\n"));
duplex.write(Buffer.from("this is a string 2\n"));
duplex.write(Buffer.from("this is a string 3\n"));
duplex.end(Buffer.from("end of write"));
duplex.on('data', (chunk: Buffer) => {
console.log(chunk.toString('utf-8'));
});
4. Transform Stream
Transform 是指把一個 Readable Stream 導入 Writable Stream 的 Stream 物件
同時具有 Writable 與 Readable Stream 的屬性
同時具有 Wriatable 與 Readable Stream 的 Buffer
特別針對需要做轉換的資料流
範例如下
encrypt.ts 把每個 byte 做 +1
import { Transform } from 'node:stream';
import { TransformCallback } from 'stream';
import * as fs from 'node:fs/promises';
import * as path from 'node:path';
class Encrypt extends Transform {
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
// console.log(chunk.toString('utf-8'));
for (let i = 0; i < chunk.length; ++i) {
if (chunk[i] !== 255) {
chunk[i] = chunk[i] + 1;
}
}
// this.push(chunk);
callback(null, chunk);
}
}
(async () => {
const readFileHandle = await fs.open(path.join(__dirname, '../test.txt'), 'r');
const writeFileHandle = await fs.open(path.join(__dirname, '../write.txt'), 'w');
const readStream = readFileHandle.createReadStream();
con
decrypt.ts 把每個 byte 做 -1
import { Transform } from 'node:stream';
import { TransformCallback } from 'stream';
import * as fs from 'node:fs/promises';
import * as path from 'node:path';
class Decrypt extends Transform {
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
// console.log(chunk.toString('utf-8'));
for (let i = 0; i < chunk.length; ++i) {
if (chunk[i] !== 255) {
chunk[i] = chunk[i] - 1;
}
}
callback(null, chunk);
}
}
(async () => {
const readFileHandle = await fs.open(path.join(__dirname, '../write.txt'), 'r');
const writeFileHandle = await fs.open(path.join(__dirname, '../decrypted.txt'), 'w');
const readStream = readFileHandle.createReadStream();
const writeStream = writeFileHandle.createWriteStream();
const decrypt = new Decrypt();
readStream.pipe(decrypt).pipe(writeStream);
})()
使用到 Stream 的 node 套件
針對 node pg module ,對於較大的回傳資料提供了 pq-query-stream 這個套件
可以透過 stream 的方式慢慢把鉅量資料逐步傳遞 https://github.com/brianc/node-postgres/tree/master/packages/pg-query-stream
Web Stream API
透過 Web Stream API, 現在可以直接在 javascript 使用 stream 的作法來更有效率的處理巨型檔案 https://developer.mozilla.org/en-US/docs/Web/API/Streams_API
Reference Data
Subscribe to my newsletter
Read articles from Yuanyu Liang directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by