如何在 Node.js 中使用多线程

作者选择了 开源精神疾病以作为 写给捐赠计划的一部分获得捐赠。

介绍

Node.js在单个线程中运行JavaScript代码,这意味着您的代码一次只能执行一项任务。然而,Node.js本身是多线程的,通过 libuv库提供隐藏的线程,它处理像从磁盘或网络请求读取文件这样的I/O操作。

虽然 Node.js 具有隐藏的线程,但您无法使用它们来卸载复杂的计算、图像调整或视频压缩等 CPU 密集的任务,因为当 CPU 密集的任务运行时,JavaScript 是单线程的,所以它会阻止主线程,而不会执行任何其他代码,直到任务完成。

然而,近年来,CPU并没有变得更快,相反,计算机正在增加额外的内核,现在计算机更常见的是有8个或更多内核,尽管这种趋势,您的代码不会利用计算机上的额外的内核来加速CPU相关的任务或避免破坏主线,因为JavaScript是单线程的。

为了解决这一问题,Node.js 引入了 worker-threads 模块,允许您创建线程并并行执行多个JavaScript任务. 一旦线程完成任务,它会向主线程发送一个消息,其中包含操作的结果,以便它可以与代码的其他部分一起使用。

在本教程中,您将创建一个 Node.js 应用程序,该应用程序具有 CPU 密集的任务,它会阻止主线程。接下来,您将使用工作线程模块将 CPU 密集的任务卸载到另一个线程,以避免阻止主线程。

前提条件

要完成本教程,您将需要:

  • 具有四个或更多核心的多核系统. 您仍然可以按照双核系统的步骤 1 到 6 的教程进行操作。 然而,步骤 7 需要四个核心才能看到性能改进。
  • A Node.js 开发环境. 如果您在 Ubuntu 22.04,则可以按照 How To Install Node.js on Ubuntu 22.04的步骤 3 安装最新版本的 Node.js。 如果您在其他操作系统上,请参阅 How to Install Node.js and Create a Local Development Environment
  • 在 JavaScript 中对事件循环,呼叫回应和承诺的良好理解,您可以在我们的教程中找到 [How to Understand the Event Loop, Callbacks, Promises, and Async

建立项目并安装依赖性

在此步骤中,您将创建项目目录,初始化npm,并安装所有必要的依赖。

要开始、创建和移动到项目目录:

1mkdir multi-threading_demo
2cd multi-threading_demo

mkdir命令创建一个目录,而cd命令将工作目录更改为新创建的目录。

接下来,使用npm init命令将项目目录初始化为 npm:

1npm init -y

y 选项接受所有默认选项。

当命令运行时,您的输出将看起来像这样:

 1Wrote to /home/sammy/multi-threading_demo/package.json:
 2
 3{
 4  "name": "multi-threading_demo",
 5  "version": "1.0.0",
 6  "description": "",
 7  "main": "index.js",
 8  "scripts": {
 9    "test": "echo \"Error: no test specified\" && exit 1"
10  },
11  "keywords": [],
12  "author": "",
13  "license": "ISC"
14}

接下来,安装 express,一个 Node.js 网页框架:

1npm install express

您将使用 Express 创建一个具有阻止和非阻止终端的服务器应用程序。

默认情况下,Node.js 配备了worker-threads模块,因此您不需要安装它。

您现在已经安装了必要的包,接下来,您将了解更多关于流程和线程以及它们如何在计算机上执行的信息。

了解流程和线程

在开始编写CPU相关的任务并将其卸载到单独的线程之前,您需要首先了解流程和线程是什么,以及它们之间的差异,最重要的是,您将审查流程和线程如何在单核或多核计算机系统上执行。

过程

一个进程是操作系统中的一个运行程序,它有自己的内存,不能看到或访问其他运行程序的内存,它也有指令指示器,该指令表明程序目前正在执行的命令,一次只能执行一个任务。

要了解这一点,您将创建一个 Node.js 程序,具有无限循环,以便它在运行时不会退出。

使用nano或您喜爱的文本编辑器,创建并打开process.js文件:

1nano process.js

在您的process.js文件中,输入以下代码:

 1[label multi-threading_demo/process.js]
 2const process_name = process.argv.slice(2)[0];
 3
 4count = 0;
 5while (true) {
 6  count++;
 7  if (count == 2000 || count == 4000) {
 8    console.log(`${process_name}: ${count}`);
 9  }
10}

在第一行中,process.argv属性返回一个包含程序命令行参数的数组,然后将JavaScript的slice()方法附加到一个2参数中,以便从索引2向前创建一个浅副本的数组。这样做会跳过前两个参数,即Node.js路径和程序文件名。

然后,您定义一个同时循环,并通过一个真实条件,使循环永久运行。循环中,变量在每个迭代过程中增加为1。接着有一个如果声明,检查是否等于20004000

使用CTRL+X保存和关闭文件,然后按Y保存更改。

使用node命令运行程序:

1node process.js A &

A 是一个命令行参数,传递给程序并存储在 process_name 变量中. 最后的 & 允许节点程序在背景中运行,允许您在壳中输入更多的命令。

当您运行该程序时,您将看到类似于以下的输出:

1[secondary_label Output]
2[1] 7754
3
4A: 2000
5A: 4000

编号7754是操作系统分配给它的进程 ID. A: 2000A: 4000是程序的输出。

当您使用节点命令运行程序时,您创建了一个过程。操作系统为该程序分配了内存,在计算机的磁盘上找到可执行的程序,然后将该程序加载到内存中。

当该过程运行时,其进程 ID 会被添加到操作系统的进程列表中,并可通过诸如 htoptop、或 ps等工具查看。

要获得一个节点过程的快速总结,请在终端中按ENTER,以恢复提示。

1ps |grep node

ps命令列出与系统上的当前用户相关的所有流程. 管道操作员``将所有ps输出传输到grep(https://man7.org/linux/man-pages/man1/egrep.1.html)中,将流程过滤到仅列出节点流程。

运行该命令将产生类似于以下的输出:

1[secondary_label Output]
27754 pts/0 00:21:49 node

例如,使用以下命令创建三个不同的进程,并将其放置在后台:

1node process.js B & node process.js C & node process.js D &

在命令中,您创建了process.js程序的另外三个实例.&符号将每个进程放置在后台。

当运行命令时,输出将看起来如下(尽管顺序可能不同):

 1[secondary_label Output]
 2[2] 7821
 3[3] 7822
 4[4] 7823
 5
 6D: 2000
 7D: 4000
 8B: 2000
 9B: 4000
10C: 2000
11C: 4000

正如您在输出中所看到的,每一个进程在计算达到20004000时将进程名称登录到终端中。

如果你仔细研究输出,你会看到输出的顺序不是你创建三个进程时的顺序。当运行命令时,进程参数以B,CD的顺序。

在一个单一的核心机器上,流程执行 concurrently。 也就是说,操作系统在定期间隔内切换过程之间。 例如,过程D在有限的时间内执行,然后其状态保存在某个地方,而操作系统计划在有限的时间内执行过程B等等。 这种情况发生在所有任务完成之前。 从输出中,它看起来像每个过程必须完成,但实际上,操作系统编程器在它们之间不断切换。

在多核系统中,假设你有四个核心,操作系统会安排每个进程同时在每个核心上执行,这被称为 parallelism

线索

线程类似于流程:它们有自己的指示指示器,并且可以同时执行一个JavaScript任务。与流程不同,线程没有自己的内存。相反,它们居住在一个流程的内存中。当你创建一个流程时,它可以使用worker_threads模块创建多个线程并行执行JavaScript代码。此外,线程可以通过传递消息或共享进程内存中的数据相互通信。

当涉及到执行线程时,它们的行为与流程相似. 如果您在单个核心系统上运行多个线程,操作系统会定期切换它们之间,给每个线程一个机会直接在单个CPU上执行。在多核系统中,操作系统会安排所有核心的线程并同时执行JavaScript代码。

ENTER,然后用杀死命令停止当前运行的所有节点流程:

1sudo kill -9 `pgrep node`

pgrep 将所有四个节点进程的进程 ID 返回至 kill 命令. -9 选项命令 kill 发送 SIGKILL 信号

当您运行该命令时,您将看到类似于以下的输出:

1[secondary_label Output]
2[1]   Killed node process.js A
3[2]   Killed node process.js B
4[3]   Killed node process.js C
5[4]   Killed node process.js D

有时,输出可能会延迟,并在稍后运行另一个命令时出现。

现在你知道流程和线程之间的区别,你将在下一节中使用 Node.js 隐藏的线程。

在 Node.js 中理解隐藏的字符串

Node.js确实提供了额外的线程,这就是为什么它被认为是多线程的。

正如介绍中提到的,JavaScript是单线程的,所有的JavaScript代码都执行在一个线程中,包括你的程序的源代码和你在你的程序中包含的第三方库。

然而,Node.js 实施了libuv库,该库为 Node.js 流程提供四个额外的线索. 通过这些线索, I/O 操作被单独处理,完成后,事件循环在微任务排列中添加与 I/O 任务相关的呼叫。当主线程中的呼叫堆清晰时,呼叫堆被推到呼叫堆,然后执行。 为了澄清这一点,与给定 I/O 任务相关的呼叫循环不会并行执行;然而,读取文件或网络请求的任务本身与线程的帮助并行发生。

除了这些四个线程之外,V8引擎(LINK0)还提供两种线程来处理自动垃圾收集等事情,从而将过程中的总线程数增加到七个:一个主线程,四个Node.js线程和两个V8线程。

要确认每个 Node.js 流程都有七个线程,请重新运行process.js文件并将其放到背景中:

1node process.js A &

终端将登录过程ID,以及来自程序的输出:

1[secondary_label Output]
2[1] 9933
3
4A: 2000
5A: 4000

注意某个地方的进程ID,然后按ENTER,以便您可以再次使用提示。

要查看线程,运行顶部命令并将其传递到输出中显示的进程 ID:

1top -H -p 9933

-H指示顶部显示进程中的线程;-p旗指示顶部仅监控进程ID中的活动。

当您运行该命令时,您的输出将看起来如下:

 1[secondary_label Output]
 2top - 09:21:11 up 15:00, 1 user, load average: 0.99, 0.60, 0.26
 3Threads:   7 total, 1 running, 6 sleeping, 0 stopped, 0 zombie
 4%Cpu(s): 24.8 us, 0.3 sy, 0.0 ni, 75.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
 5MiB Mem :   7951.2 total, 6756.1 free, 248.4 used, 946.7 buff/cache
 6MiB Swap:      0.0 total, 0.0 free, 0.0 used. 7457.4 avail Mem
 7
 8    PID USER PR NI VIRT RES SHR S  %CPU  %MEM TIME+ COMMAND
 9   9933 node-us+  20 0 597936 51864 33956 R 99.9 0.6 4:19.64 node
10   9934 node-us+  20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
11   9935 node-us+  20 0 597936 51864 33956 S 0.0 0.6 0:00.84 node
12   9936 node-us+  20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
13   9937 node-us+  20 0 597936 51864 33956 S 0.0 0.6 0:00.93 node
14   9938 node-us+  20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
15   9939 node-us+  20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node

正如您在输出中所看到的,Node.js 流程共有七个线程:一个执行 JavaScript 的主要线程,四个 Node.js 线程和两个 V8 线程。

如前所述,四个 Node.js 线程用于 I/O 操作,使其不受阻挡。它们对该任务非常有效,并且为 I/O 操作自己创建线程甚至可能使应用程序性能恶化。

现在按q来退出顶部并用以下命令停止节点过程:

1kill -9 9933

现在你已经知道 Node.js 流程中的线程了,你将在下一节写出一个与 CPU 相关的任务,并观察它如何影响主线程。

创建一个没有工人线条的 CPU-Bound 任务

在本节中,您将构建一个Express应用程序,该应用程序具有非封锁路线和执行CPU相关任务的封锁路线。

首先,在您喜爱的编辑器中打开index.js:

1nano index.js

在您的 index.js 文件中,添加以下代码来创建基本服务器:

 1[label multi-threading_demo/index.js]
 2const express = require("express");
 3
 4const app = express();
 5const port = process.env.PORT || 3000;
 6
 7app.get("/non-blocking/", (req, res) => {
 8  res.status(200).send("This page is non-blocking");
 9});
10
11app.listen(port, () => {
12  console.log(`App listening on port ${port}`);
13});

在接下来的代码块中,您使用 Express 创建了一个 HTTP 服务器. 在第一行中,您导入了express模块. 接下来,您将app变量设置为 Express 的实例。

接下来,您使用app.get('/non-blocking')来定义应该发送的GET请求路径,最后,您调用app.listen()方法来指示服务器在端口3000上开始聆听。

接下来,定义另一个路径,即 /blocking/,该路径将包含一个 CPU 密集任务:

 1[label multi-threading_demo/index.js]
 2...
 3app.get("/blocking", async (req, res) => {
 4  let counter = 0;
 5  for (let i = 0; i < 20_000_000_000; i++) {
 6    counter++;
 7  }
 8  res.status(200).send(`result is ${counter}`);
 9});
10
11app.listen(port, () => {
12  console.log(`App listening on port ${port}`);
13});

您使用app.get(/blocking)来定义/blocking路径,该路径采用与async关键字前缀的非同步调用作为运行 CPU 密集任务的第二个参数。在调用中,您创建一个for循环,重复 20 亿次,每次重复时,它会增加counter变量为1

此时,你的 index.js 文件将看起来像这样:

 1[label multi-threading_demo/index.js]
 2const express = require("express");
 3
 4const app = express();
 5const port = process.env.PORT || 3000;
 6
 7app.get("/non-blocking/", (req, res) => {
 8  res.status(200).send("This page is non-blocking");
 9});
10
11app.get("/blocking", async (req, res) => {
12  let counter = 0;
13  for (let i = 0; i < 20_000_000_000; i++) {
14    counter++;
15  }
16  res.status(200).send(`result is ${counter}`);
17});
18
19app.listen(port, () => {
20  console.log(`App listening on port ${port}`);
21});

保存和退出您的文件,然后使用以下命令启动服务器:

1node index.js

当您运行该命令时,您将看到类似于以下的输出:

1[secondary_label Output]
2App listening on port 3000

这表明服务器正在运行并准备服务。

现在,请在您喜欢的浏览器中访问http://localhost:3000/non-blocking

<$>[注] 注: 如果您正在远程服务器上遵循教程,您可以使用端口转发在浏览器中测试应用程序。

当 Express 服务器仍在运行时,请在本地计算机上打开另一个终端,然后输入以下命令:

1ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

当您连接到服务器时,请在本地计算机的网页浏览器上导航到http://localhost:3000/non-blocking

接下来,打开一个新选项卡,然后访问http://localhost:3000/blocking。当页面加载时,快速打开两个选项卡,然后再次访问http://localhost:3000/non-blocking。你会看到你不会得到即时响应,页面将继续尝试加载。只有在/blocking路线完成加载并返回响应结果是20000000000,其余路径才会返回响应。

所有/非阻止路径不像/阻止路径负载一样工作的原因是由于CPU绑定的for循环,它阻止了主线条。当主线条被阻止时,Node.js无法服务任何请求,直到CPU绑定的任务完成。

正如您所看到的,阻止主线条可能会损害用户的应用体验. 要解决此问题,您需要将 CPU 相关任务卸载到另一个线条,以便主线条继续处理其他 HTTP 请求。

您将在下一节中重新启动服务器,然后对 index.js 文件进行更多更改. 服务器停止的原因是 Node.js 不会在对文件进行新更改时自动更新。

现在你已经了解了CPU密集型任务对你的应用程序的负面影响,你现在将试图通过使用承诺来避免阻止主线。

使用承诺卸载CPU绑定任务

通常,当开发人员了解CPU绑定任务的阻止效果时,他们会转向承诺使代码非封锁。这种本能源于使用非封锁基于承诺的I/O方法的知识,如readFile()writeFile()。但是,正如你所了解的那样,I/O操作使用Node.js隐藏的线索,而CPU绑定任务则不使用它。

在编辑器中再次打开 index.js 文件:

1nano index.js

在您的 index.js 文件中,删除包含 CPU 密集任务的突出代码:

 1[label multi-threading_demo/index.js]
 2...
 3app.get("/blocking", async (req, res) => {
 4  let counter = 0;
 5  for (let i = 0; i < 20_000_000_000; i++) {
 6    counter++;
 7  }
 8  res.status(200).send(`result is ${counter}`);
 9});
10...

接下来,添加包含返回承诺的函数的以下突出代码:

 1[label multi-threading_demo/index.js]
 2...
 3function calculateCount() {
 4  return new Promise((resolve, reject) => {
 5    let counter = 0;
 6    for (let i = 0; i < 20_000_000_000; i++) {
 7      counter++;
 8    }
 9    resolve(counter);
10  });
11}
12
13app.get("/blocking", async (req, res) => {
14  res.status(200).send(`result is ${counter}`);
15}

calculateCount()函数现在包含了您在/blocking处理函数中所做的计算。该函数返回了一个承诺,该函数是用新承诺语法初始化的。

接下来,请在 index.js 文件中的 /blocking/ 处理函数中调用 calculateCount() 函数:

1[label multi-threading_demo/index.js]
2app.get("/blocking", async (req, res) => {
3  const counter = await calculateCount();
4  res.status(200).send(`result is ${counter}`);
5});

在这里,您呼叫calculateCount()函数,前缀为等待的关键字,以等待承诺解决。

您的完整代码现在将看起来如下:

 1[label multi-threading_demo/index.js]
 2const express = require("express");
 3
 4const app = express();
 5const port = process.env.PORT || 3000;
 6
 7app.get("/non-blocking/", (req, res) => {
 8  res.status(200).send("This page is non-blocking");
 9});
10
11function calculateCount() {
12  return new Promise((resolve, reject) => {
13    let counter = 0;
14    for (let i = 0; i < 20_000_000_000; i++) {
15      counter++;
16    }
17    resolve(counter);
18  });
19}
20
21app.get("/blocking", async (req, res) => {
22  const counter = await calculateCount();
23  res.status(200).send(`result is ${counter}`);
24});
25
26app.listen(port, () => {
27  console.log(`App listening on port ${port}`);
28});

保存并退出您的文件,然后重新启动服务器:

1node index.js

在您的 Web 浏览器中,请访问 http://localhost:3000/blocking,当它加载时,快速重新加载 http://localhost:3000/non-blocking 选项卡。正如您所看到的那样, non-blocking 路径仍然受到影响,它们都将等待 /blocking 路径完成加载。由于路径仍然受到影响,承诺不会使 JavaScript 代码并行执行,并且不能用来使 CPU 相关的任务不受阻止。

然后,用CTRL+C关闭应用程序服务器。

现在你知道承诺没有提供任何机制来使CPU相关任务不被阻挡,你将使用Node.js的工作线程模块将CPU相关任务卸载到一个单独的线程中。

使用worker-threads模块卸载 CPU-Bound 任务

在本节中,您将使用worker-threads模块将 CPU 密集的任务卸载到另一个线程,以避免阻止主线程。 要做到这一点,您将创建一个包含 CPU 密集的任务的worker.js文件。 在index.js文件中,您将使用worker-threads模块来初始化线程,并在worker.js文件中启动与主线程并行运行的任务。

首先,请使用nproc命令确认您有 2 个或多个内核:

1nproc
1[secondary_label Output]
24

如果显示两个或多个核心,您可以继续执行此步骤。

接下来,在文本编辑器中创建并打开worker.js文件:

1nano worker.js

在您的worker.js文件中,添加以下代码来导入worker-threads模块,并执行CPU密集的任务:

1[label multi-threading_demo/worker.js]
2const { parentPort } = require("worker_threads");
3
4let counter = 0;
5for (let i = 0; i < 20_000_000_000; i++) {
6  counter++;
7}

第一行加载了worker_threads模块并提取了parentPort类. 该类提供了您可以用来发送消息到主线的方法. 接下来,您将有当前在index.js文件中的calculateCount()函数中的CPU密集任务。

接下来,添加下面的突出代码:

1[label multi-threading_demo/worker.js]
2const { parentPort } = require("worker_threads");
3
4let counter = 0;
5for (let i = 0; i < 20_000_000_000; i++) {
6  counter++;
7}
8
9parentPort.postMessage(counter);

在这里,您调用了parentPort类的postMessage()方法,该方法将向包含在counter变量中存储的CPU绑定任务的结果的主线发送一个消息。

保存并退出您的文件. 在文本编辑器中打开 index.js:

1nano index.js

由于您已经在worker.js中具有 CPU 绑定任务,请从index.js中删除突出代码:

 1[label multi-threading_demo/index.js]
 2const express = require("express");
 3
 4const app = express();
 5const port = process.env.PORT || 3000;
 6
 7app.get("/non-blocking/", (req, res) => {
 8  res.status(200).send("This page is non-blocking");
 9});
10
11function calculateCount() {
12  return new Promise((resolve, reject) => {
13    let counter = 0;
14    for (let i = 0; i < 20_000_000_000; i++) {
15      counter++;
16    }
17    resolve(counter);
18  });
19}
20
21app.get("/blocking", async (req, res) => {
22  const counter = await calculateCount();
23  res.status(200).send(`result is ${counter}`);
24});
25
26app.listen(port, () => {
27  console.log(`App listening on port ${port}`);
28});

接下来,在app.get(/blocking)的回调中,添加以下代码来初始化线程:

 1[label multi-threading_demo/index.js]
 2const express = require("express");
 3const { Worker } = require("worker_threads");
 4...
 5app.get("/blocking", async (req, res) => {
 6  const worker = new Worker("./worker.js");
 7  worker.on("message", (data) => {
 8    res.status(200).send(`result is ${data}`);
 9  });
10  worker.on("error", (msg) => {
11    res.status(404).send(`An error occurred: ${msg}`);
12  });
13});
14...

首先,您导入worker_threads模块并解包Worker类别。在app.get(/blocking)调用中,您使用关键字创建了Worker的实例,其次是Worker的调用,以worker.js文件路径作为其参数。

接下来,您将一个事件附加到worker实例中,使用on(消息)方法来收听消息事件. 当收到包含来自worker.js文件的结果的消息时,它作为参数传递到方法的回调,该方法将返回包含CPU相关任务的结果的用户回复。

接下来,您使用on(error)方法将另一个事件附加到 worker 实例中,以便听取错误事件。

您的完整文件现在将看起来如下:

 1[label multi-threading_demo/index.js]
 2const express = require("express");
 3const { Worker } = require("worker_threads");
 4
 5const app = express();
 6const port = process.env.PORT || 3000;
 7
 8app.get("/non-blocking/", (req, res) => {
 9  res.status(200).send("This page is non-blocking");
10});
11
12app.get("/blocking", async (req, res) => {
13  const worker = new Worker("./worker.js");
14  worker.on("message", (data) => {
15    res.status(200).send(`result is ${data}`);
16  });
17  worker.on("error", (msg) => {
18    res.status(404).send(`An error occurred: ${msg}`);
19  });
20});
21
22app.listen(port, () => {
23  console.log(`App listening on port ${port}`);
24});

保存和退出您的文件,然后运行服务器:

1node index.js

在您的 Web 浏览器中再次访问 http://localhost:3000/blocking 选项卡之前,请重新加载所有 http://localhost:3000/non-blocking 选项卡。您现在应该注意到它们正在即时加载,而不会等待 /blocking 路线完成加载。 这是因为CPU 绑定的任务被下载到另一个线程,而主线程处理所有传入的请求。

现在,停止您的服务器使用CTRL+C

现在,您可以使用工人线程进行 CPU 密集任务非阻止,您将使用四个工人线程来提高 CPU 密集任务的性能。

使用四个工人线程优化 CPU 密集任务

在本节中,您将 CPU 密集的任务分为四个工人线程,以便他们能够更快地完成任务并缩短 `/blocking’ 路径的加载时间。

要在同一任务上工作更多的工人线程,你需要将任务分开,因为该任务涉及循环 20 亿次,你将与你想要使用的线程的数量分为 20 亿。在这种情况下,它是 4。计算20_000_000_000 / 4将导致5_000_000_000所以每个线程将从05_000_000_000循环,并增加计数1。当每个线程结束时,它将向主线程发送一个消息,其中包含结果。

例如,如果你想在目录中调整800个图像大小,你可以创建一个包含所有图像文件路径的阵列。接下来,将800分为4(线程数),并让每个线程在一个范围内工作。

首先,请确保您有四种或更多颜色:

1nproc
1[secondary_label Output]
24

使用cp命令创建worker.js文件的副本:

1cp worker.js four_workers.js

当前的index.js 和worker.js 文件将不受影响,因此您可以重新运行它们,以便稍后将其性能与本节中的更改进行比较。

接下来,在文本编辑器中打开four_workers.js文件:

1nano four_workers.js

four_workers.js文件中,添加突出的代码来导入workerData对象:

1[label multi-threading_demo/four_workers.js]
2const { workerData, parentPort } = require("worker_threads");
3
4let counter = 0;
5for (let i = 0; i < 20_000_000_000 / workerData.thread_count; i++) {
6  counter++;
7}
8
9parentPort.postMessage(counter);

首先,您将提取WorkerData对象,该对象将包含从主线传递的数据,当线程初始化时(您将很快在index.js文件中进行)。

保存并关闭您的文件,然后复制index.js文件:

1cp index.js index_four_workers.js

在您的编辑器中打开 index_four_workers.js 文件:

1nano index_four_workers.js

index_four_workers.js文件中,添加突出的代码来创建一个线程实例:

 1[label multi-threading_demo/index_four_workers.js]
 2...
 3const app = express();
 4const port = process.env.PORT || 3000;
 5const THREAD_COUNT = 4;
 6...
 7function createWorker() {
 8  return new Promise(function (resolve, reject) {
 9    const worker = new Worker("./four_workers.js", {
10      workerData: { thread_count: THREAD_COUNT },
11    });
12  });
13}
14
15app.get("/blocking", async (req, res) => {
16  ...
17})
18...

首先,您定义了包含您要创建的线程数量的THREAD_COUNT常数,随后,当您在服务器上有更多的内核时,扩展将涉及将THREAD_COUNT的值更改为您想要使用的线程数量。

接下来,createWorker()函数创建并返回一个承诺。在承诺回调中,你通过将Worker类的文件路径传递到four_workers.js文件中作为第一个参数来初始化一个新线程,然后将一个对象传递为第二个参数。接下来,你将对象分配给具有其值另一个对象的workerData属性。最后,你将对象分配给thread_count属性,其值是THREAD_COUNT常数中的线程数。

若要确保承诺解决或丢失错误,请添加以下突出的行:

 1[label multi-threading_demo/index_four_workers.js]
 2...
 3function createWorker() {
 4  return new Promise(function (resolve, reject) {
 5    const worker = new Worker("./four_workers.js", {
 6      workerData: { thread_count: THREAD_COUNT },
 7    });
 8    worker.on("message", (data) => {
 9      resolve(data);
10    });
11    worker.on("error", (msg) => {
12      reject(`An error ocurred: ${msg}`);
13    });
14  });
15}
16...

当工人线向主线发送消息时,承诺与返回的数据一起解决,但是,如果出现错误,承诺会返回错误消息。

现在您已经定义了初始化新线程并从线程中返回数据的函数,您将使用app.get(/blocking)中的函数生成新线程。

但首先,请删除以下突出代码,因为您已经在createWorker()函数中定义了此功能:

 1[label multi-threading_demo/index_four_workers.js]
 2...
 3app.get("/blocking", async (req, res) => {
 4  const worker = new Worker("./worker.js");
 5  worker.on("message", (data) => {
 6    res.status(200).send(`result is ${data}`);
 7  });
 8  worker.on("error", (msg) => {
 9    res.status(404).send(`An error ocurred: ${msg}`);
10  });
11});
12...

删除代码后,添加以下代码以初始化四个工作线程:

1[label multi-threading_demo/index_four_workers.js]
2...
3app.get("/blocking", async (req, res) => {
4  const workerPromises = [];
5  for (let i = 0; i < THREAD_COUNT; i++) {
6    workerPromises.push(createWorker());
7  }
8});
9...

首先,你创建一个WorkerPromises变量,其中包含一个空的数组。接下来,你重复到THREAD_COUNT中的值,即4的次数。在每个重复过程中,你调用createWorker()函数来创建一个新线程,然后按下承诺对象,使该函数返回workerPromises的数组,使用JavaScript的方法。当循环结束时,workerPromises将有四个承诺对象,每个从调用createWorker()函数返回四次。

现在,添加下面的突出代码,等待承诺解决并返回用户的回复:

 1[label multi-threading_demo/index_four_workers.js]
 2app.get("/blocking", async (req, res) => {
 3  const workerPromises = [];
 4  for (let i = 0; i < THREAD_COUNT; i++) {
 5    workerPromises.push(createWorker());
 6  }
 7
 8  const thread_results = await Promise.all(workerPromises);
 9  const total =
10    thread_results[0] +
11    thread_results[1] +
12    thread_results[2] +
13    thread_results[3];
14  res.status(200).send(`result is ${total}`);
15});

由于workerPromises数组包含呼叫createWorker()的返回承诺,所以您将Promise.all()方法以Wait语法为前缀,并将all()方法以workerPromises为参数。Promise.all()方法等待数组中的所有承诺得到解决。当发生这种情况时,thread_results变量包含承诺得到解决的值。由于计算分为四个工人,您将它们全部添加到一起,使用thread_results语法从thread_results中获取每个值。

您的完整文件现在应该是这样的:

 1[label multi-threading_demo/index_four_workers.js]
 2const express = require("express");
 3const { Worker } = require("worker_threads");
 4
 5const app = express();
 6const port = process.env.PORT || 3000;
 7const THREAD_COUNT = 4;
 8
 9app.get("/non-blocking/", (req, res) => {
10  res.status(200).send("This page is non-blocking");
11});
12
13function createWorker() {
14  return new Promise(function (resolve, reject) {
15    const worker = new Worker("./four_workers.js", {
16      workerData: { thread_count: THREAD_COUNT },
17    });
18    worker.on("message", (data) => {
19      resolve(data);
20    });
21    worker.on("error", (msg) => {
22      reject(`An error ocurred: ${msg}`);
23    });
24  });
25}
26
27app.get("/blocking", async (req, res) => {
28  const workerPromises = [];
29  for (let i = 0; i < THREAD_COUNT; i++) {
30    workerPromises.push(createWorker());
31  }
32  const thread_results = await Promise.all(workerPromises);
33  const total =
34    thread_results[0] +
35    thread_results[1] +
36    thread_results[2] +
37    thread_results[3];
38  res.status(200).send(`result is ${total}`);
39});
40
41app.listen(port, () => {
42  console.log(`App listening on port ${port}`);
43});

在运行此文件之前,先运行index.js以测量其响应时间:

1node index.js

接下来,在本地计算机上打开一个新的终端,然后输入以下弯曲命令,测量从/blocking路径获得响应需要多长时间:

1[environment local]
2time curl --get http://localhost:3000/blocking

curl命令将HTTP请求发送到给定URL,而--get选项则指示curl发送GET请求。

当命令运行时,您的输出将看起来像这样:

1[secondary_label Output]
2real	0m28.882s
3user	0m0.018s
4sys	0m0.000s

突出显示的输出显示,需要大约28秒才能获得响应,这可能在您的计算机上有所不同。

接下来,用CTRL+C关闭服务器并运行index_four_workers.js文件:

1node index_four_workers.js

在您的第二个终端中再次访问‘/blocking’路线:

1[environment local]
2time curl --get http://localhost:3000/blocking

您将看到与以下相一致的输出:

1[secondary_label Output]
2real	0m8.491s
3user	0m0.011s
4sys	0m0.005s

输出显示,它需要大约8秒,这意味着您将加载时间缩短约70%。

您成功地使用四个工人线程优化了与CPU相关的任务. 如果您有一个具有四个以上内核的机器,请将THREAD_COUNT更新到这个数字,您将进一步缩短加载时间。

结论

在本文中,你建立了一個 Node 應用程式,用一個 CPU 連結的任務來阻止主線. 然後,你試圖使用承諾使該任務不封鎖,但失敗了。 之後,你使用「worker_threads」模塊將 CPU 連結的任務卸載到另一個線程,使其不封鎖。

作为下一步,请参阅 Node.js Worker threads 文档,以了解更多有关选项的信息。 此外,您还可以参阅 piscina 库,该库允许您为 CPU 密集任务创建一个工作组。

Published At
Categories with 技术
comments powered by Disqus