作者选择了 开源精神疾病以作为 写给捐赠计划的一部分获得捐赠。
介绍
Node.js在单个线程中运行JavaScript代码,这意味着您的代码一次只能执行一项任务。然而,Node.js本身是多线程的,通过 libuv
库提供隐藏的线程,它处理像从磁盘或网络请求读取文件这样的I/O操作。
虽然 Node.js 具有隐藏的线程,但您无法使用它们来卸载复杂的计算、图像调整或视频压缩等 CPU 密集的任务,因为当 CPU 密集的任务运行时,JavaScript 是单线程的,所以它会阻止主线程,而不会执行任何其他代码,直到任务完成。
然而,近年来,CPU并没有变得更快,相反,计算机正在增加额外的内核,现在计算机更常见的是有8个或更多内核,尽管这种趋势,您的代码不会利用计算机上的额外的内核来加速CPU相关的任务或避免破坏主线,因为JavaScript是单线程的。
为了解决这一问题,Node.js 引入了 worker-threads
模块,允许您创建线程并并行执行多个JavaScript任务. 一旦线程完成任务,它会向主线程发送一个消息,其中包含操作的结果,以便它可以与代码的其他部分一起使用。
在本教程中,您将创建一个 Node.js 应用程序,该应用程序具有 CPU 密集的任务,它会阻止主线程。接下来,您将使用工作线程
模块将 CPU 密集的任务卸载到另一个线程,以避免阻止主线程。
前提条件
要完成本教程,您将需要:
- 具有四个或更多核心的多核系统. 您仍然可以按照双核系统的步骤 1 到 6 的教程进行操作。 然而,步骤 7 需要四个核心才能看到性能改进。
- A Node.js 开发环境. 如果您在 Ubuntu 22.04,则可以按照 How To Install Node.js on Ubuntu 22.04的步骤 3 安装最新版本的 Node.js。 如果您在其他操作系统上,请参阅 How to Install Node.js and Create a Local Development Environment。
- 在 JavaScript 中对事件循环,呼叫回应和承诺的良好理解,您可以在我们的教程中找到 [How to Understand the Event Loop, Callbacks, Promises, and Async
建立项目并安装依赖性
在此步骤中,您将创建项目目录,初始化npm
,并安装所有必要的依赖。
要开始、创建和移动到项目目录:
1mkdir multi-threading_demo
2cd multi-threading_demo
mkdir
命令创建一个目录,而cd
命令将工作目录更改为新创建的目录。
接下来,使用npm init
命令将项目目录初始化为 npm:
1npm init -y
y
选项接受所有默认选项。
当命令运行时,您的输出将看起来像这样:
1Wrote to /home/sammy/multi-threading_demo/package.json:
2
3{
4 "name": "multi-threading_demo",
5 "version": "1.0.0",
6 "description": "",
7 "main": "index.js",
8 "scripts": {
9 "test": "echo \"Error: no test specified\" && exit 1"
10 },
11 "keywords": [],
12 "author": "",
13 "license": "ISC"
14}
接下来,安装 express
,一个 Node.js 网页框架:
1npm install express
您将使用 Express 创建一个具有阻止和非阻止终端的服务器应用程序。
默认情况下,Node.js 配备了worker-threads
模块,因此您不需要安装它。
您现在已经安装了必要的包,接下来,您将了解更多关于流程和线程以及它们如何在计算机上执行的信息。
了解流程和线程
在开始编写CPU相关的任务并将其卸载到单独的线程之前,您需要首先了解流程和线程是什么,以及它们之间的差异,最重要的是,您将审查流程和线程如何在单核或多核计算机系统上执行。
过程
一个进程是操作系统中的一个运行程序,它有自己的内存,不能看到或访问其他运行程序的内存,它也有指令指示器,该指令表明程序目前正在执行的命令,一次只能执行一个任务。
要了解这一点,您将创建一个 Node.js 程序,具有无限循环,以便它在运行时不会退出。
使用nano
或您喜爱的文本编辑器,创建并打开process.js
文件:
1nano process.js
在您的process.js
文件中,输入以下代码:
1[label multi-threading_demo/process.js]
2const process_name = process.argv.slice(2)[0];
3
4count = 0;
5while (true) {
6 count++;
7 if (count == 2000 || count == 4000) {
8 console.log(`${process_name}: ${count}`);
9 }
10}
在第一行中,process.argv
属性返回一个包含程序命令行参数的数组,然后将JavaScript的slice()
方法附加到一个2
参数中,以便从索引2向前创建一个浅副本的数组。这样做会跳过前两个参数,即Node.js路径和程序文件名。
然后,您定义一个同时
循环,并通过一个真实
条件,使循环永久运行。循环中,数
变量在每个迭代过程中增加为1
。接着有一个如果
声明,检查数
是否等于2000
或4000
。
使用CTRL+X
保存和关闭文件,然后按Y
保存更改。
使用node
命令运行程序:
1node process.js A &
A
是一个命令行参数,传递给程序并存储在 process_name
变量中. 最后的 &
允许节点程序在背景中运行,允许您在壳中输入更多的命令。
当您运行该程序时,您将看到类似于以下的输出:
1[secondary_label Output]
2[1] 7754
3
4A: 2000
5A: 4000
编号7754
是操作系统分配给它的进程 ID. A: 2000
和A: 4000
是程序的输出。
当您使用节点
命令运行程序时,您创建了一个过程。操作系统为该程序分配了内存,在计算机的磁盘上找到可执行的程序,然后将该程序加载到内存中。
当该过程运行时,其进程 ID 会被添加到操作系统的进程列表中,并可通过诸如 htop
、 top
、或 ps
等工具查看。
要获得一个节点过程的快速总结,请在终端中按ENTER
,以恢复提示。
1ps |grep node
ps
命令列出与系统上的当前用户相关的所有流程. 管道操作员``将所有ps
输出传输到grep
(https://man7.org/linux/man-pages/man1/egrep.1.html)中,将流程过滤到仅列出节点流程。
运行该命令将产生类似于以下的输出:
1[secondary_label Output]
27754 pts/0 00:21:49 node
例如,使用以下命令创建三个不同的进程,并将其放置在后台:
1node process.js B & node process.js C & node process.js D &
在命令中,您创建了process.js
程序的另外三个实例.&
符号将每个进程放置在后台。
当运行命令时,输出将看起来如下(尽管顺序可能不同):
1[secondary_label Output]
2[2] 7821
3[3] 7822
4[4] 7823
5
6D: 2000
7D: 4000
8B: 2000
9B: 4000
10C: 2000
11C: 4000
正如您在输出中所看到的,每一个进程在计算达到2000
和4000
时将进程名称登录到终端中。
如果你仔细研究输出,你会看到输出的顺序不是你创建三个进程时的顺序。当运行命令时,进程参数以B
,C
和D
的顺序。
在一个单一的核心机器上,流程执行 concurrently。 也就是说,操作系统在定期间隔内切换过程之间。 例如,过程D
在有限的时间内执行,然后其状态保存在某个地方,而操作系统计划在有限的时间内执行过程B
等等。 这种情况发生在所有任务完成之前。 从输出中,它看起来像每个过程必须完成,但实际上,操作系统编程器在它们之间不断切换。
在多核系统中,假设你有四个核心,操作系统会安排每个进程同时在每个核心上执行,这被称为 parallelism。
线索
线程类似于流程:它们有自己的指示指示器,并且可以同时执行一个JavaScript任务。与流程不同,线程没有自己的内存。相反,它们居住在一个流程的内存中。当你创建一个流程时,它可以使用worker_threads
模块创建多个线程并行执行JavaScript代码。此外,线程可以通过传递消息或共享进程内存中的数据相互通信。
当涉及到执行线程时,它们的行为与流程相似. 如果您在单个核心系统上运行多个线程,操作系统会定期切换它们之间,给每个线程一个机会直接在单个CPU上执行。在多核系统中,操作系统会安排所有核心的线程并同时执行JavaScript代码。
按ENTER
,然后用杀死
命令停止当前运行的所有节点流程:
1sudo kill -9 `pgrep node`
pgrep
将所有四个节点进程的进程 ID 返回至 kill
命令. -9
选项命令 kill
发送 SIGKILL 信号。
当您运行该命令时,您将看到类似于以下的输出:
1[secondary_label Output]
2[1] Killed node process.js A
3[2] Killed node process.js B
4[3] Killed node process.js C
5[4] Killed node process.js D
有时,输出可能会延迟,并在稍后运行另一个命令时出现。
现在你知道流程和线程之间的区别,你将在下一节中使用 Node.js 隐藏的线程。
在 Node.js 中理解隐藏的字符串
Node.js确实提供了额外的线程,这就是为什么它被认为是多线程的。
正如介绍中提到的,JavaScript是单线程的,所有的JavaScript代码都执行在一个线程中,包括你的程序的源代码和你在你的程序中包含的第三方库。
然而,Node.js 实施了libuv
库,该库为 Node.js 流程提供四个额外的线索. 通过这些线索, I/O 操作被单独处理,完成后,事件循环在微任务排列中添加与 I/O 任务相关的呼叫。当主线程中的呼叫堆清晰时,呼叫堆被推到呼叫堆,然后执行。 为了澄清这一点,与给定 I/O 任务相关的呼叫循环不会并行执行;然而,读取文件或网络请求的任务本身与线程的帮助并行发生。
除了这些四个线程之外,V8引擎(LINK0)还提供两种线程来处理自动垃圾收集等事情,从而将过程中的总线程数增加到七个:一个主线程,四个Node.js线程和两个V8线程。
要确认每个 Node.js 流程都有七个线程,请重新运行process.js
文件并将其放到背景中:
1node process.js A &
终端将登录过程ID,以及来自程序的输出:
1[secondary_label Output]
2[1] 9933
3
4A: 2000
5A: 4000
注意某个地方的进程ID,然后按ENTER
,以便您可以再次使用提示。
要查看线程,运行顶部
命令并将其传递到输出中显示的进程 ID:
1top -H -p 9933
-H
指示顶部
显示进程中的线程;-p
旗指示顶部
仅监控进程ID中的活动。
当您运行该命令时,您的输出将看起来如下:
1[secondary_label Output]
2top - 09:21:11 up 15:00, 1 user, load average: 0.99, 0.60, 0.26
3Threads: 7 total, 1 running, 6 sleeping, 0 stopped, 0 zombie
4%Cpu(s): 24.8 us, 0.3 sy, 0.0 ni, 75.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
5MiB Mem : 7951.2 total, 6756.1 free, 248.4 used, 946.7 buff/cache
6MiB Swap: 0.0 total, 0.0 free, 0.0 used. 7457.4 avail Mem
7
8 PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
9 9933 node-us+ 20 0 597936 51864 33956 R 99.9 0.6 4:19.64 node
10 9934 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
11 9935 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.84 node
12 9936 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
13 9937 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.93 node
14 9938 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.83 node
15 9939 node-us+ 20 0 597936 51864 33956 S 0.0 0.6 0:00.00 node
正如您在输出中所看到的,Node.js 流程共有七个线程:一个执行 JavaScript 的主要线程,四个 Node.js 线程和两个 V8 线程。
如前所述,四个 Node.js 线程用于 I/O 操作,使其不受阻挡。它们对该任务非常有效,并且为 I/O 操作自己创建线程甚至可能使应用程序性能恶化。
现在按q
来退出顶部
并用以下命令停止节点过程:
1kill -9 9933
现在你已经知道 Node.js 流程中的线程了,你将在下一节写出一个与 CPU 相关的任务,并观察它如何影响主线程。
创建一个没有工人线条的 CPU-Bound 任务
在本节中,您将构建一个Express应用程序,该应用程序具有非封锁路线和执行CPU相关任务的封锁路线。
首先,在您喜爱的编辑器中打开index.js
:
1nano index.js
在您的 index.js 文件中,添加以下代码来创建基本服务器:
1[label multi-threading_demo/index.js]
2const express = require("express");
3
4const app = express();
5const port = process.env.PORT || 3000;
6
7app.get("/non-blocking/", (req, res) => {
8 res.status(200).send("This page is non-blocking");
9});
10
11app.listen(port, () => {
12 console.log(`App listening on port ${port}`);
13});
在接下来的代码块中,您使用 Express 创建了一个 HTTP 服务器. 在第一行中,您导入了express
模块. 接下来,您将app
变量设置为 Express 的实例。
接下来,您使用app.get('/non-blocking')
来定义应该发送的GET
请求路径,最后,您调用app.listen()
方法来指示服务器在端口3000
上开始聆听。
接下来,定义另一个路径,即 /blocking/
,该路径将包含一个 CPU 密集任务:
1[label multi-threading_demo/index.js]
2...
3app.get("/blocking", async (req, res) => {
4 let counter = 0;
5 for (let i = 0; i < 20_000_000_000; i++) {
6 counter++;
7 }
8 res.status(200).send(`result is ${counter}`);
9});
10
11app.listen(port, () => {
12 console.log(`App listening on port ${port}`);
13});
您使用app.get(
/blocking)
来定义/blocking
路径,该路径采用与async
关键字前缀的非同步调用作为运行 CPU 密集任务的第二个参数。在调用中,您创建一个for
循环,重复 20 亿次,每次重复时,它会增加counter
变量为1
。
此时,你的 index.js 文件将看起来像这样:
1[label multi-threading_demo/index.js]
2const express = require("express");
3
4const app = express();
5const port = process.env.PORT || 3000;
6
7app.get("/non-blocking/", (req, res) => {
8 res.status(200).send("This page is non-blocking");
9});
10
11app.get("/blocking", async (req, res) => {
12 let counter = 0;
13 for (let i = 0; i < 20_000_000_000; i++) {
14 counter++;
15 }
16 res.status(200).send(`result is ${counter}`);
17});
18
19app.listen(port, () => {
20 console.log(`App listening on port ${port}`);
21});
保存和退出您的文件,然后使用以下命令启动服务器:
1node index.js
当您运行该命令时,您将看到类似于以下的输出:
1[secondary_label Output]
2App listening on port 3000
这表明服务器正在运行并准备服务。
现在,请在您喜欢的浏览器中访问http://localhost:3000/non-blocking
。
<$>[注] 注: 如果您正在远程服务器上遵循教程,您可以使用端口转发在浏览器中测试应用程序。
当 Express 服务器仍在运行时,请在本地计算机上打开另一个终端,然后输入以下命令:
1ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip
当您连接到服务器时,请在本地计算机的网页浏览器上导航到http://localhost:3000/non-blocking
。
接下来,打开一个新选项卡,然后访问http://localhost:3000/blocking
。当页面加载时,快速打开两个选项卡,然后再次访问http://localhost:3000/non-blocking
。你会看到你不会得到即时响应,页面将继续尝试加载。只有在/blocking
路线完成加载并返回响应结果是20000000000
,其余路径才会返回响应。
所有/非阻止
路径不像/阻止
路径负载一样工作的原因是由于CPU绑定的for
循环,它阻止了主线条。当主线条被阻止时,Node.js无法服务任何请求,直到CPU绑定的任务完成。
正如您所看到的,阻止主线条可能会损害用户的应用体验. 要解决此问题,您需要将 CPU 相关任务卸载到另一个线条,以便主线条继续处理其他 HTTP 请求。
您将在下一节中重新启动服务器,然后对 index.js 文件进行更多更改. 服务器停止的原因是 Node.js 不会在对文件进行新更改时自动更新。
现在你已经了解了CPU密集型任务对你的应用程序的负面影响,你现在将试图通过使用承诺来避免阻止主线。
使用承诺卸载CPU绑定任务
通常,当开发人员了解CPU绑定任务的阻止效果时,他们会转向承诺使代码非封锁。这种本能源于使用非封锁基于承诺的I/O方法的知识,如readFile()
和writeFile()
。但是,正如你所了解的那样,I/O操作使用Node.js隐藏的线索,而CPU绑定任务则不使用它。
在编辑器中再次打开 index.js 文件:
1nano index.js
在您的 index.js 文件中,删除包含 CPU 密集任务的突出代码:
1[label multi-threading_demo/index.js]
2...
3app.get("/blocking", async (req, res) => {
4 let counter = 0;
5 for (let i = 0; i < 20_000_000_000; i++) {
6 counter++;
7 }
8 res.status(200).send(`result is ${counter}`);
9});
10...
接下来,添加包含返回承诺的函数的以下突出代码:
1[label multi-threading_demo/index.js]
2...
3function calculateCount() {
4 return new Promise((resolve, reject) => {
5 let counter = 0;
6 for (let i = 0; i < 20_000_000_000; i++) {
7 counter++;
8 }
9 resolve(counter);
10 });
11}
12
13app.get("/blocking", async (req, res) => {
14 res.status(200).send(`result is ${counter}`);
15}
calculateCount()
函数现在包含了您在/blocking
处理函数中所做的计算。该函数返回了一个承诺,该函数是用新承诺
语法初始化的。
接下来,请在 index.js 文件中的 /blocking/
处理函数中调用 calculateCount()
函数:
1[label multi-threading_demo/index.js]
2app.get("/blocking", async (req, res) => {
3 const counter = await calculateCount();
4 res.status(200).send(`result is ${counter}`);
5});
在这里,您呼叫calculateCount()
函数,前缀为等待
的关键字,以等待承诺解决。
您的完整代码现在将看起来如下:
1[label multi-threading_demo/index.js]
2const express = require("express");
3
4const app = express();
5const port = process.env.PORT || 3000;
6
7app.get("/non-blocking/", (req, res) => {
8 res.status(200).send("This page is non-blocking");
9});
10
11function calculateCount() {
12 return new Promise((resolve, reject) => {
13 let counter = 0;
14 for (let i = 0; i < 20_000_000_000; i++) {
15 counter++;
16 }
17 resolve(counter);
18 });
19}
20
21app.get("/blocking", async (req, res) => {
22 const counter = await calculateCount();
23 res.status(200).send(`result is ${counter}`);
24});
25
26app.listen(port, () => {
27 console.log(`App listening on port ${port}`);
28});
保存并退出您的文件,然后重新启动服务器:
1node index.js
在您的 Web 浏览器中,请访问 http://localhost:3000/blocking
,当它加载时,快速重新加载 http://localhost:3000/non-blocking
选项卡。正如您所看到的那样, non-blocking
路径仍然受到影响,它们都将等待 /blocking
路径完成加载。由于路径仍然受到影响,承诺不会使 JavaScript 代码并行执行,并且不能用来使 CPU 相关的任务不受阻止。
然后,用CTRL+C
关闭应用程序服务器。
现在你知道承诺没有提供任何机制来使CPU相关任务不被阻挡,你将使用Node.js的工作线程
模块将CPU相关任务卸载到一个单独的线程中。
使用worker-threads
模块卸载 CPU-Bound 任务
在本节中,您将使用worker-threads
模块将 CPU 密集的任务卸载到另一个线程,以避免阻止主线程。 要做到这一点,您将创建一个包含 CPU 密集的任务的worker.js
文件。 在index.js
文件中,您将使用worker-threads
模块来初始化线程,并在worker.js
文件中启动与主线程并行运行的任务。
首先,请使用nproc
命令确认您有 2 个或多个内核:
1nproc
1[secondary_label Output]
24
如果显示两个或多个核心,您可以继续执行此步骤。
接下来,在文本编辑器中创建并打开worker.js
文件:
1nano worker.js
在您的worker.js
文件中,添加以下代码来导入worker-threads
模块,并执行CPU密集的任务:
1[label multi-threading_demo/worker.js]
2const { parentPort } = require("worker_threads");
3
4let counter = 0;
5for (let i = 0; i < 20_000_000_000; i++) {
6 counter++;
7}
第一行加载了worker_threads
模块并提取了parentPort
类. 该类提供了您可以用来发送消息到主线的方法. 接下来,您将有当前在index.js
文件中的calculateCount()
函数中的CPU密集任务。
接下来,添加下面的突出代码:
1[label multi-threading_demo/worker.js]
2const { parentPort } = require("worker_threads");
3
4let counter = 0;
5for (let i = 0; i < 20_000_000_000; i++) {
6 counter++;
7}
8
9parentPort.postMessage(counter);
在这里,您调用了parentPort
类的postMessage()
方法,该方法将向包含在counter
变量中存储的CPU绑定任务的结果的主线发送一个消息。
保存并退出您的文件. 在文本编辑器中打开 index.js
:
1nano index.js
由于您已经在worker.js
中具有 CPU 绑定任务,请从index.js
中删除突出代码:
1[label multi-threading_demo/index.js]
2const express = require("express");
3
4const app = express();
5const port = process.env.PORT || 3000;
6
7app.get("/non-blocking/", (req, res) => {
8 res.status(200).send("This page is non-blocking");
9});
10
11function calculateCount() {
12 return new Promise((resolve, reject) => {
13 let counter = 0;
14 for (let i = 0; i < 20_000_000_000; i++) {
15 counter++;
16 }
17 resolve(counter);
18 });
19}
20
21app.get("/blocking", async (req, res) => {
22 const counter = await calculateCount();
23 res.status(200).send(`result is ${counter}`);
24});
25
26app.listen(port, () => {
27 console.log(`App listening on port ${port}`);
28});
接下来,在app.get(/blocking
)的回调中,添加以下代码来初始化线程:
1[label multi-threading_demo/index.js]
2const express = require("express");
3const { Worker } = require("worker_threads");
4...
5app.get("/blocking", async (req, res) => {
6 const worker = new Worker("./worker.js");
7 worker.on("message", (data) => {
8 res.status(200).send(`result is ${data}`);
9 });
10 worker.on("error", (msg) => {
11 res.status(404).send(`An error occurred: ${msg}`);
12 });
13});
14...
首先,您导入worker_threads
模块并解包Worker
类别。在app.get(
/blocking)
调用中,您使用新
关键字创建了Worker
的实例,其次是Worker
的调用,以worker.js
文件路径作为其参数。
接下来,您将一个事件附加到worker
实例中,使用on(
消息)
方法来收听消息事件. 当收到包含来自worker.js
文件的结果的消息时,它作为参数传递到方法的回调,该方法将返回包含CPU相关任务的结果的用户回复。
接下来,您使用on(
error)
方法将另一个事件附加到 worker 实例中,以便听取错误事件。
您的完整文件现在将看起来如下:
1[label multi-threading_demo/index.js]
2const express = require("express");
3const { Worker } = require("worker_threads");
4
5const app = express();
6const port = process.env.PORT || 3000;
7
8app.get("/non-blocking/", (req, res) => {
9 res.status(200).send("This page is non-blocking");
10});
11
12app.get("/blocking", async (req, res) => {
13 const worker = new Worker("./worker.js");
14 worker.on("message", (data) => {
15 res.status(200).send(`result is ${data}`);
16 });
17 worker.on("error", (msg) => {
18 res.status(404).send(`An error occurred: ${msg}`);
19 });
20});
21
22app.listen(port, () => {
23 console.log(`App listening on port ${port}`);
24});
保存和退出您的文件,然后运行服务器:
1node index.js
在您的 Web 浏览器中再次访问 http://localhost:3000/blocking
选项卡之前,请重新加载所有 http://localhost:3000/non-blocking
选项卡。您现在应该注意到它们正在即时加载,而不会等待 /blocking
路线完成加载。 这是因为CPU 绑定的任务被下载到另一个线程,而主线程处理所有传入的请求。
现在,停止您的服务器使用CTRL+C
。
现在,您可以使用工人线程进行 CPU 密集任务非阻止,您将使用四个工人线程来提高 CPU 密集任务的性能。
使用四个工人线程优化 CPU 密集任务
在本节中,您将 CPU 密集的任务分为四个工人线程,以便他们能够更快地完成任务并缩短 `/blocking’ 路径的加载时间。
要在同一任务上工作更多的工人线程,你需要将任务分开,因为该任务涉及循环 20 亿次,你将与你想要使用的线程的数量分为 20 亿。在这种情况下,它是 4。计算20_000_000_000 / 4
将导致5_000_000_000
所以每个线程将从0
到5_000_000_000
循环,并增加计数
到1
。当每个线程结束时,它将向主线程发送一个消息,其中包含结果。
例如,如果你想在目录中调整800个图像大小,你可以创建一个包含所有图像文件路径的阵列。接下来,将800
分为4
(线程数),并让每个线程在一个范围内工作。
首先,请确保您有四种或更多颜色:
1nproc
1[secondary_label Output]
24
使用cp
命令创建worker.js
文件的副本:
1cp worker.js four_workers.js
当前的index.js 和worker.js 文件将不受影响,因此您可以重新运行它们,以便稍后将其性能与本节中的更改进行比较。
接下来,在文本编辑器中打开four_workers.js
文件:
1nano four_workers.js
在four_workers.js
文件中,添加突出的代码来导入workerData
对象:
1[label multi-threading_demo/four_workers.js]
2const { workerData, parentPort } = require("worker_threads");
3
4let counter = 0;
5for (let i = 0; i < 20_000_000_000 / workerData.thread_count; i++) {
6 counter++;
7}
8
9parentPort.postMessage(counter);
首先,您将提取WorkerData
对象,该对象将包含从主线传递的数据,当线程初始化时(您将很快在index.js
文件中进行)。
保存并关闭您的文件,然后复制index.js
文件:
1cp index.js index_four_workers.js
在您的编辑器中打开 index_four_workers.js 文件:
1nano index_four_workers.js
在index_four_workers.js
文件中,添加突出的代码来创建一个线程实例:
1[label multi-threading_demo/index_four_workers.js]
2...
3const app = express();
4const port = process.env.PORT || 3000;
5const THREAD_COUNT = 4;
6...
7function createWorker() {
8 return new Promise(function (resolve, reject) {
9 const worker = new Worker("./four_workers.js", {
10 workerData: { thread_count: THREAD_COUNT },
11 });
12 });
13}
14
15app.get("/blocking", async (req, res) => {
16 ...
17})
18...
首先,您定义了包含您要创建的线程数量的THREAD_COUNT
常数,随后,当您在服务器上有更多的内核时,扩展将涉及将THREAD_COUNT
的值更改为您想要使用的线程数量。
接下来,createWorker()
函数创建并返回一个承诺。在承诺回调中,你通过将Worker
类的文件路径传递到four_workers.js
文件中作为第一个参数来初始化一个新线程,然后将一个对象传递为第二个参数。接下来,你将对象分配给具有其值另一个对象的workerData
属性。最后,你将对象分配给thread_count
属性,其值是THREAD_COUNT
常数中的线程数。
若要确保承诺解决或丢失错误,请添加以下突出的行:
1[label multi-threading_demo/index_four_workers.js]
2...
3function createWorker() {
4 return new Promise(function (resolve, reject) {
5 const worker = new Worker("./four_workers.js", {
6 workerData: { thread_count: THREAD_COUNT },
7 });
8 worker.on("message", (data) => {
9 resolve(data);
10 });
11 worker.on("error", (msg) => {
12 reject(`An error ocurred: ${msg}`);
13 });
14 });
15}
16...
当工人线向主线发送消息时,承诺与返回的数据一起解决,但是,如果出现错误,承诺会返回错误消息。
现在您已经定义了初始化新线程并从线程中返回数据的函数,您将使用app.get(
/blocking)
中的函数生成新线程。
但首先,请删除以下突出代码,因为您已经在createWorker()
函数中定义了此功能:
1[label multi-threading_demo/index_four_workers.js]
2...
3app.get("/blocking", async (req, res) => {
4 const worker = new Worker("./worker.js");
5 worker.on("message", (data) => {
6 res.status(200).send(`result is ${data}`);
7 });
8 worker.on("error", (msg) => {
9 res.status(404).send(`An error ocurred: ${msg}`);
10 });
11});
12...
删除代码后,添加以下代码以初始化四个工作线程:
1[label multi-threading_demo/index_four_workers.js]
2...
3app.get("/blocking", async (req, res) => {
4 const workerPromises = [];
5 for (let i = 0; i < THREAD_COUNT; i++) {
6 workerPromises.push(createWorker());
7 }
8});
9...
首先,你创建一个WorkerPromises
变量,其中包含一个空的数组。接下来,你重复到THREAD_COUNT
中的值,即4
的次数。在每个重复过程中,你调用createWorker()
函数来创建一个新线程,然后按下承诺对象,使该函数返回workerPromises
的数组,使用JavaScript的推
方法。当循环结束时,workerPromises
将有四个承诺对象,每个从调用createWorker()
函数返回四次。
现在,添加下面的突出代码,等待承诺解决并返回用户的回复:
1[label multi-threading_demo/index_four_workers.js]
2app.get("/blocking", async (req, res) => {
3 const workerPromises = [];
4 for (let i = 0; i < THREAD_COUNT; i++) {
5 workerPromises.push(createWorker());
6 }
7
8 const thread_results = await Promise.all(workerPromises);
9 const total =
10 thread_results[0] +
11 thread_results[1] +
12 thread_results[2] +
13 thread_results[3];
14 res.status(200).send(`result is ${total}`);
15});
由于workerPromises
数组包含呼叫createWorker()
的返回承诺,所以您将Promise.all()
方法以Wait
语法为前缀,并将all()
方法以workerPromises
为参数。Promise.all()
方法等待数组中的所有承诺得到解决。当发生这种情况时,thread_results
变量包含承诺得到解决的值。由于计算分为四个工人,您将它们全部添加到一起,使用thread_results
语法从thread_results
中获取每个值。
您的完整文件现在应该是这样的:
1[label multi-threading_demo/index_four_workers.js]
2const express = require("express");
3const { Worker } = require("worker_threads");
4
5const app = express();
6const port = process.env.PORT || 3000;
7const THREAD_COUNT = 4;
8
9app.get("/non-blocking/", (req, res) => {
10 res.status(200).send("This page is non-blocking");
11});
12
13function createWorker() {
14 return new Promise(function (resolve, reject) {
15 const worker = new Worker("./four_workers.js", {
16 workerData: { thread_count: THREAD_COUNT },
17 });
18 worker.on("message", (data) => {
19 resolve(data);
20 });
21 worker.on("error", (msg) => {
22 reject(`An error ocurred: ${msg}`);
23 });
24 });
25}
26
27app.get("/blocking", async (req, res) => {
28 const workerPromises = [];
29 for (let i = 0; i < THREAD_COUNT; i++) {
30 workerPromises.push(createWorker());
31 }
32 const thread_results = await Promise.all(workerPromises);
33 const total =
34 thread_results[0] +
35 thread_results[1] +
36 thread_results[2] +
37 thread_results[3];
38 res.status(200).send(`result is ${total}`);
39});
40
41app.listen(port, () => {
42 console.log(`App listening on port ${port}`);
43});
在运行此文件之前,先运行index.js
以测量其响应时间:
1node index.js
接下来,在本地计算机上打开一个新的终端,然后输入以下弯曲
命令,测量从/blocking
路径获得响应需要多长时间:
1[environment local]
2time curl --get http://localhost:3000/blocking
curl
命令将HTTP请求发送到给定URL,而--get
选项则指示curl
发送GET
请求。
当命令运行时,您的输出将看起来像这样:
1[secondary_label Output]
2real 0m28.882s
3user 0m0.018s
4sys 0m0.000s
突出显示的输出显示,需要大约28秒才能获得响应,这可能在您的计算机上有所不同。
接下来,用CTRL+C
关闭服务器并运行index_four_workers.js
文件:
1node index_four_workers.js
在您的第二个终端中再次访问‘/blocking’路线:
1[environment local]
2time curl --get http://localhost:3000/blocking
您将看到与以下相一致的输出:
1[secondary_label Output]
2real 0m8.491s
3user 0m0.011s
4sys 0m0.005s
输出显示,它需要大约8秒,这意味着您将加载时间缩短约70%。
您成功地使用四个工人线程优化了与CPU相关的任务. 如果您有一个具有四个以上内核的机器,请将THREAD_COUNT
更新到这个数字,您将进一步缩短加载时间。
结论
在本文中,你建立了一個 Node 應用程式,用一個 CPU 連結的任務來阻止主線. 然後,你試圖使用承諾使該任務不封鎖,但失敗了。 之後,你使用「worker_threads」模塊將 CPU 連結的任務卸載到另一個線程,使其不封鎖。
作为下一步,请参阅 Node.js Worker threads 文档,以了解更多有关选项的信息。 此外,您还可以参阅 piscina
库,该库允许您为 CPU 密集任务创建一个工作组。