Nodejs server 與 CPU 密集的任務 -- worker thread

Yuanyu LiangYuanyu Liang
2 min read

簡介

本篇文章將會說明兩大主題:

  1. Nodejs Server 遇到 CPU 密集的任務 可能會遇到的 Blocking 問題

  2. Nodejs 使用 worker thread 解決上述問題的做法

概念說明

以下為了比較好說明 worker thread

將逐一簡介以下概念

Server

把 Server 簡化為一個可以處理 Web Request 的服務

每一個 Request 進來 , Server 就會回應一個 response

舉例來說: 網頁服務器會根據傳入網址會應對應的頁面

Thread

Thread 可以想像是一個最小單位處理 Request 的服務

一個 Server 裏面可以有一個或是多個 Thread 來處理 Request

Single Thread Server: 使用單一 Thread 來服務 Request 的 Server

Multi Thread Server: 使用多個 Thread 來服務 Request 的 Server

為了更加好懂

可以想像 Server 相當於一個餐廳

而 Thread 就相當於是服務生

假設因為不是很受歡迎

餐廳只請了一個服務生來服務同時舉辦3個 Party 的客人

因為只需要處理接待登記的部份

所以當接待完一個 party 的登記後,馬上服務生又可以去服務下一個進入的客人

可以發現服務生接待時間與3個 party 客人活躍時間做一個關係圖如下:

上圖所示:

  1. Party 所進行的時間不需要 Waiter 一直 standby
  2. Waiter 只有需要登記新的客人才會需要出現

Request 也可以分為 CPU active Time 還有 不需要 CPU 運算的時間如下圖

Non Blocking I/O

所謂的 Non Blocking I/O 是指: 單一執行緒處理 Request I/O 可以併發處理,處理I/O 的執行緒並不會被某一個 Request 卡住所有其他 I/O

Blocking I/O

所謂的 Blocking I/O 是指: 單一執行緒處理 Request I/O 無法併發處理,必須處理完當下處理的 Request I/O 才能繼續往下執行其他 Request

Nodejs 的 single thread

一個 Nodejs 所啟動的服務會使用單一執行緒來執行所有任務

透過 libuv 的 Event loop 實踐 Non Blocking I/O

當 CPU 消耗很多的 Request 出現

因為 Nodejs 執行是使用單一執行緒來執行所有任務

因此當有一個,需要耗時 CPU 很長時間的 Request 出現時

所有其他 Request 都需要等待這個任務執行完 才能使用 request 資源

範例

假設寫一個很長的迴圈 如下

export const heavilyJob = (totalCount = 20_000_000_000): number => {
  let count = 0;
  console.log({ totalCount });
  for (let i = 0; i < totalCount; i++) {
    count++;
  }
  return count;
};

heavily-job-sample

當執行完

time curl http://localhost:3000/blocking-route

會發現

time curl http://localhost:3000/

都被上一個 heavily-job 卡住

解法之一 worker thread

要避免 cpu 資源被佔住的一個作法是啟用 worker_thread

讓 nodejs 開啟另一個 thread 把 cpu 資源與 main Thread 分開

作法如下:

import { parentPort } from 'worker_threads';
import { heavilyJob } from './heavily-job';
import { HEAVY_COUNT } from './constant';

parentPort.postMessage(heavilyJob(HEAVY_COUNT));
const workerPromise: Promise<number> = new Promise<number>(
      (resolve, reject) => {
        const worker = new Worker(path.join(__dirname, './worker.js'));
        worker.on('message', (data: number) => {
          resolve(data);
        });
        worker.on('error', (error) => {
          reject(error);
        });
      },
    );
 const count = await workerPromise;

single-worker-thrread-sample

優化

以目前的範例

可以發現 counter 是可以並行運算的

因此,可以透過把 heavy-job 分化給4個 worker 來做優化

前提是該CPU 需要多個 Core

否則也是循序去執行

import { heavilyJob } from './heavily-job';
import { HEAVY_COUNT } from './constant';

parentPort.postMessage(heavilyJob(HEAVY_COUNT / workerData.thread_count));
const workerPromises: Promise<number>[] = [];
    for (let i = 0; i < THREAD_COUNT; i++) {
      workerPromises.push(this.fourWorkerService.createWorker());
    }
    const thread_results = await Promise.all(workerPromises);
    const total =
      thread_results[0] +
      thread_results[1] +
      thread_results[2] +
      thread_results[3];
    return {
      data: total,
      message: 'this is block service',
    };

four-worker-thread-sample

0
Subscribe to my newsletter

Read articles from Yuanyu Liang directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Yuanyu Liang
Yuanyu Liang