Nodejs Stream 的基礎概念

Yuanyu LiangYuanyu Liang
6 min read

大綱

本文主要會說明 Nodejs 中,透過 stream api 可以用達成以下好處

  1. 可以比較有效率的處理大型資料,避免大型檔案遭成程序阻塞
  2. 可以透過有限度的資源處理相同量級資料

What is Stream?

Stream 代表一個資料流。

通常是指處理大型資料把切分為一堆堆小型區塊資量逐步處理的方式

Why use Stream?

  1. 節省資源(記憶體使用)

  2. 透過 Buffer 可以效率處理資料 I/O

Stream 類別

根據資料的流向以及處理方式可以分為以下四類 Data Stream

1. Writable Stream

Writable Stream 是一種用來處理寫把資料輸出的 Stream 物件

具有一個屬性叫作 writableHighWaterMark ,代表累積多少資料才做輸出

writableHighWaterMark 是 16384

內部具有一個預設的 Buffer 結構,預設大小是 writableHighWaterMark Bytes 約為 16MB,用來對寫入資料做暫存

可以透過對 Writable Stream 做 write 方法來把資料寫入 Internal Buffer

直到資料到達 writableHighWaterMark 才做寫入

但當資料超過 writableHighWaterMark 時,超過的部份就會直接被加入記憶體內

造成 Nodejs 記憶體使用量超過原本預期。直到 Nodejs 程序把 Internal Buffer的資料處理完。

比較好的處理方式是透過判斷寫入時判斷回傳值

stream.write 如果回傳值是 true 代表 Internal buffer 還有空間可以做寫入

stream.write 如果回傳值是 false 代表 Internal buffer 還沒有空間可以做寫入 需要花時間處理

當遇到 false 時,可以使用 stream.pause 來暫時停止寫入資料

直到 stream 發出 drain 事件代表,已經處理完 Buffer 內資料,則可以透過 stream.resume 繼續往下處理資料

範例如下:

import * as fs from 'fs/promises';
import * as path from 'path';

(async () => {
  console.time("writeMany");
  const fileHandle = await fs.open(path.join(__dirname, "../test.txt"), "w");
  const stream = fileHandle.createWriteStream();
  console.log(stream.writableHighWaterMark);
  let i = 0;
  const numberOfWrites = 10000000;
  const writeMany = () => {
    while ( i < numberOfWrites) {
      const buff = Buffer.from(` ${i} `, 'utf-8');
      // last write
      if (i === numberOfWrites -1) {
        return stream.end(buff);
      }
      // if stream.write return false stop the loop
      if (!stream.write(buff)) {
        break;
      }
      i++;
    }
  }
  writeMany();
  // resume our loop once our stream's internal buffer is empty
  stream.on('drain', () => {
    writeMany();
  });

  stream.on('finish', () =>{ 
    console.timeEnd("writeMany");
    fileHandle.close();
  })
})()

2. Readable Stream

Readable Stream 是一種用來處理寫把資料輸入的 Stream 物件

具有一個屬性叫作 highWaterMark ,代表累積多少資料才做輸入

highWaterMark 是 65536

內部具有一個預設的 Buffer 結構,預設大小是 highWaterMark Bytes 約為 64MB,用來對輸出資料做暫存

可以透過對 Readable Stream 做 push 或是 end 方法來把資料寫入 Internal Buffer

直到資料到達 highWaterMark 或是讀取到 EOF 可以從 stream.on('data') 事件的讀出輸入的部份資料

範例如下

import * as fs from 'node:fs/promises';
import * as path from 'path';
(async () => {
  console.time('readBig');
  const fileHandleRead = await fs.open(path.join(__dirname, "../src.txt"), "r");
  const fileHandleWrite = await fs.open(path.join(__dirname, "../dest.txt"), "w");
  const streamRead = fileHandleRead.createReadStream({ highWaterMark: 64 * 1024});
  const streamWrite = fileHandleWrite.createWriteStream();
  let split = '';
  streamRead.on('data', (chunk) => {
    const numbers = chunk.toString('utf-8').split('  ');
    if (Number(numbers[0]) !== Number(numbers[1]) -1) {
      if (split) numbers[0] = split + numbers[0].trim();
    }
    if (Number(numbers[numbers.length - 2]) + 1 !==  Number(numbers[numbers.length - 1]) ) {
      split = numbers.pop();
    }

    numbers.forEach((number) => {
      let n = Number(number);
      if (n%10 == 0) {
        if (!streamWrite.write(" "+n+" ")) {
          streamRead.pause();
        }
      }
    });
  });
  streamWrite.on("drain", () => {
    streamRead.resume();
  });
  streamRead.on('end', () => {
    console.log("Done Reading");
    console.timeEnd('readBig');
  });
})();

2.1 把 Writable Stream 與 Readable Stream 串接起來

  1. 透過 pipe 運算

透過 pipe 運算可以把 Readable Strream 串接到 Writable Stream

範例如下

import { pipeline } from 'node:stream';
import * as fs from 'node:fs/promises';
import * as path from 'path';
(async () => {
  console.time("copyStream");
  const srcFile = await fs.open(path.join(__dirname, '../dest.txt'), 'r');
  const destFile = await fs.open(path.join(__dirname, '../dest-copy-stream.txt'), 'w');
  const readStream = srcFile.createReadStream();
  const writeStream = destFile.createWriteStream();
  readStream.pipe(writeStream);
  readStream.on('end', () => {
    console.timeEnd("copyStream");
  });
})();

然而 pipe 運算並不是一種好的作法

主要原因是沒有良好的錯誤處理接口

需要額外針對兩個 Stream 各自做錯誤事件的監聽並且對錯誤的 Stream 做 close

取而代之可以使用 pipeline 來做串接

範例如下:

import { pipeline } from 'node:stream';
import * as fs from 'node:fs/promises';
import * as path from 'path';
(async () => {
  console.time("copyStream");
  const srcFile = await fs.open(path.join(__dirname, '../dest.txt'), 'r');
  const destFile = await fs.open(path.join(__dirname, '../dest-copy-stream.txt'), 'w');
  const readStream = srcFile.createReadStream();
  const writeStream = destFile.createWriteStream();
  pipeline(readStream, writeStream, (err) => {
    console.log(err);
    console.timeEnd("copyStream");
  })
  // readStream.pipe(writeStream);
  // readStream.on('end', () => {
  //   console.timeEnd("copyStream");
  // });
})();

3. Duplex Stream

Duplex 是一種同時具有 Writable Stream 與 Readable Stream 的 Stream 物件

同時具有 Writable 與 Readable Stream 的屬性

同時具有 Writable 與 Readable Stream 的 Buffer

可以分開對不同資料流做讀以及寫入

範例如下

import { Duplex } from 'node:stream';
import * as fs from 'node:fs';
import * as path from 'node:path';

class DuplexStream extends Duplex {
  private readFileName: string = '';
  private writeFileName: string = '';
  private readFd: number =  -1;
  private writeFd: number = -1;
  private chunks: Uint8Array[] = [];
  private chunksSize = 0;
  constructor({ writableHighWaterMark, readableHighWaterMark, readFileName, writeFileName }: 
    { writableHighWaterMark?: number, readableHighWaterMark?: number, readFileName: string, writeFileName: string }) {
    super({writableHighWaterMark, readableHighWaterMark })
    this.readFileName = readFileName;
    this.writeFileName = writeFileName;

  }
  _construct(callback: (error?: Error) => void): void {
    fs.open(this.readFileName, 'r', (err, readFd) => {
      if (err) return callback(err);

      this.readFd = readFd;
      fs.open(this.writeFileName, 'w', (err, writeFd) => {
        if (err) return callback(err);

        this.writeFd = writeFd;
        callback();
      });
    });
  }
  _write(chunk: Buffer, encoding: BufferEncoding, callback: (error?: Error) => void): void {
    this.chunks.push(chunk);
    this.chunksSize += chunk.length;
    // do our write operation...
    if (this.chunksSize > this.writableHighWaterMark) {
      fs.write(this.writeFd, Buffer.concat(this.chunks), (err) => {
        if (err) {
          return callback(err);
        }
        this.chunks = [];
        this.chunksSize = 0;
        callback();
      });
    } else {
      // when we're done, we should call the callback function
      callback();
    }
  }
  _read(size: number): void {
    const buff = Buffer.alloc(size);
    fs.read(this.readFd, buff, 0, size, null, (err, bytesRead) => {
      if (err) return this.destroy(err);
      // nul is to indicate the end of stream
      this.push(bytesRead > 0 ? buff.subarray(0, bytesRead): null);
    });
  }
  _final(callback: (error?: Error) => void): void {
    fs.write(this.writeFd, Buffer.concat(this.chunks), (err) => {
      if (err) return callback(err);

      this.chunks = [];
      callback();
    });
  }
  _destroy(error: Error, callback: (error: Error) => void): void {
    callback(error);
  }
}

const duplex = new DuplexStream({ 
  readFileName: path.join(__dirname, '../read.txt'), 
  writeFileName: path.join(__dirname, '../write.txt') });

duplex.write(Buffer.from("this is a string 0\n"));
duplex.write(Buffer.from("this is a string 1\n"));
duplex.write(Buffer.from("this is a string 2\n"));
duplex.write(Buffer.from("this is a string 3\n"));
duplex.end(Buffer.from("end of write"));
duplex.on('data', (chunk: Buffer) => {
  console.log(chunk.toString('utf-8'));
});

4. Transform Stream

Transform 是指把一個 Readable Stream 導入 Writable Stream 的 Stream 物件

同時具有 Writable 與 Readable Stream 的屬性

同時具有 Wriatable 與 Readable Stream 的 Buffer

特別針對需要做轉換的資料流

範例如下

encrypt.ts 把每個 byte 做 +1

import { Transform } from 'node:stream';
import { TransformCallback } from 'stream';
import * as fs from 'node:fs/promises';
import * as path from 'node:path';
class Encrypt extends Transform {
  _transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
    // console.log(chunk.toString('utf-8'));
    for (let i = 0; i < chunk.length; ++i) {
      if (chunk[i] !== 255) {
        chunk[i] = chunk[i] + 1;
      }
    }
    // this.push(chunk);
    callback(null, chunk);
  }
}
(async () => {
  const readFileHandle = await fs.open(path.join(__dirname, '../test.txt'), 'r');
  const writeFileHandle = await fs.open(path.join(__dirname, '../write.txt'), 'w');

  const readStream = readFileHandle.createReadStream();
  con

decrypt.ts 把每個 byte 做 -1

import { Transform } from 'node:stream';
import { TransformCallback } from 'stream';
import * as fs from 'node:fs/promises';
import * as path from 'node:path';
class Decrypt extends Transform {
  _transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
    // console.log(chunk.toString('utf-8'));
    for (let i = 0; i < chunk.length; ++i) {
      if (chunk[i] !== 255) {
        chunk[i] = chunk[i] - 1;
      }
    }
    callback(null, chunk);
  }
}
(async () => {
  const readFileHandle = await fs.open(path.join(__dirname, '../write.txt'), 'r');
  const writeFileHandle = await fs.open(path.join(__dirname, '../decrypted.txt'), 'w');

  const readStream = readFileHandle.createReadStream();
  const writeStream = writeFileHandle.createWriteStream();
  const decrypt = new Decrypt();

  readStream.pipe(decrypt).pipe(writeStream);
})()

使用到 Stream 的 node 套件

針對 node pg module ,對於較大的回傳資料提供了 pq-query-stream 這個套件

可以透過 stream 的方式慢慢把鉅量資料逐步傳遞 https://github.com/brianc/node-postgres/tree/master/packages/pg-query-stream

Web Stream API

透過 Web Stream API, 現在可以直接在 javascript 使用 stream 的作法來更有效率的處理巨型檔案 https://developer.mozilla.org/en-US/docs/Web/API/Streams_API

Reference Data

Nodejs stream 講解

Nodejs Stream api 文件

readable pipe 官方文件

0
Subscribe to my newsletter

Read articles from Yuanyu Liang directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Yuanyu Liang
Yuanyu Liang