# 多进程优化:Node.js cluster模块

单个 Node.js 实例运行在单个线程中。 为了充分利用多核系统,有时需要启用一组 Node.js 进程去处理负载任务。

cluster 模块可以创建共享服务器端口的子进程。

cluster 模块使我们能够充分利用多核系统的性能。

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`主进程 ${process.pid} 正在运行`);

  // 衍生工作进程。
  for (let i = 0; i < numCPUs/2; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`工作进程 ${worker.process.pid} 已退出`);
  });
} else {
  // 工作进程可以共享任何 TCP 连接。
  // 在本例子中,共享的是 HTTP 服务器。
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('你好世界\n');
  }).listen(8000);

  console.log(`工作进程 ${process.pid} 已启动`);
}

运行代码,则工作进程会共享 8000 端口:

$ node server.js
主进程 3596 正在运行
工作进程 4324 已启动
工作进程 4520 已启动
工作进程 6056 已启动
工作进程 5644 已启动

在 Windows 上,尚无法在工作进程中设置命名管道服务器。

注意一般情况下,不要占满所有的进程。因为,一方面 fork 进程会造成内存消耗。另一方面,Node.js底层会用到其他核CPU进行事件循环的处理,如果占满所有的进程,则会阻塞事件循环。有可能最终导致整体导致性能下降,同时却占用了大量内存。

那么,Cluster 内部又是如何工作的呢?多个进程间是如何通信的?多个进程是如何监听同一个端口的?Node.js 是如何将请求分发到各个进程上的?

# 工作原理

# 进程通信

工作进程由 child_process.fork() 方法创建,因此它们可以使用 IPC 和父进程通信,从而使各进程交替处理连接服务。

# 请求分发

cluster 模块支持两种分发连接的方法。

第一种方法(也是除 Windows 外所有平台的默认方法)是 循环法,由主进程负责监听端口,接收新连接后再将连接循环分发给工作进程,在分发中使用了一些内置技巧防止工作进程任务过载。

第二种方法是,主进程创建监听 socket 后发送给感兴趣的工作进程,由工作进程负责直接接收连接。

理论上第二种方法应该是效率最佳的。 但在实际情况下,由于操作系统调度机制的难以捉摸,会使分发变得不稳定。 可能会出现八个进程中有两个分担了 70% 的负载。

因为 server.listen() 将大部分工作交给主进程完成,因此导致普通 Node.js 进程与 cluster 工作进程差异的情况有三种:

  • server.listen({fd: 7}) 因为消息会被传给主进程,所以父进程中的文件描述符 7 将会被监听并将句柄传给工作进程,而不是监听文件描述符 7 指向的工作进程。
  • server.listen(handle) 显式地监听句柄,会导致工作进程直接使用该句柄,而不是和主进程通信。
  • server.listen(0) 正常情况下,这种调用会导致 server 在随机端口上监听。 但在 cluster 模式中,所有工作进程每次调用 listen(0) 时会收到相同的“随机”端口。 实质上,这种端口只在第一次分配时随机,之后就变得可预料。 如果要使用独立端口的话,应该根据工作进程的 ID 来生成端口号。

Node.js 不支持路由逻辑。 因此在设计应用时,不应该过分依赖内存数据对象,例如 session 和登陆等。

由于各工作进程是独立的进程,它们可以根据需要随时关闭或重新生成,而不影响其他进程的正常运行。 只要有存活的工作进程,服务器就可以继续处理连接。 如果没有存活的工作进程,现有连接会丢失,新的连接也会被拒绝。 Node.js 不会自动管理工作进程的数量,而应该由具体的应用根据实际需要来管理进程池。

虽然 cluster 模块主要用于网络相关的情况,但同样可以用于其他需要工作进程的情况。

# 事件

# fork

当新的工作进程被衍生时,cluster 模块将会触发 'fork' 事件。 可以被用来记录工作进程活动,并产生一个自定义的超时。

const timeouts = [];
function errorMsg() {
  console.error('连接出错');
}

cluster.on('fork', (worker) => {
  timeouts[worker.id] = setTimeout(errorMsg, 2000);
});
cluster.on('listening', (worker, address) => {
  clearTimeout(timeouts[worker.id]);
});
cluster.on('exit', (worker, code, signal) => {
  clearTimeout(timeouts[worker.id]);
  errorMsg();
});

# online

当一个子进程被主进程 fork 创建时,这个子进程应该回复一条 online 信息。当主进程接收到 online 信息时,将触发该事件执行。

'fork' 事件和 'online' 事件的区别在于,当主进程衍生工作进程时触发 'fork',当工作进程运行时触发 'online'

cluster.fork().on('online', () => {
  // 工作进程已上线。
});

# message

当集群主进程从任何工作进程接收到消息时触发。

# disconnect

在工作进程的 IPC 管道被断开后触发。 可能导致事件触发的原因包括:工作进程优雅地退出、被杀死、或手动断开连接(如调用 worker.disconnect())。

'disconnect''exit' 事件之间可能存在延迟。 这些事件可以用来检测进程是否在清理过程中被卡住,或是否存在长时间运行的连接。

cluster.on('disconnect', (worker) => {
  console.log(`工作进程 #${worker.id} 已断开连接`);
});

# exit

当任何一个工作进程关闭的时候,cluster 模块都将会触发 'exit' 事件。

这可以用于重启工作进程(通过再次调用 .fork())。

cluster.on('exit', (worker, code, signal) => {
  console.log('工作进程 %d 关闭 (%s). 重启中...',
              worker.process.pid, signal || code);
  cluster.fork();
});

# 方法

# cluster.fork()

衍生出一个新的工作进程。注意只能通过主进程调用。

# cluster.disconnect()

必须在 cluster.workers 的每个工作进程中调用 .disconnect()。

当所有工作进程断开连接后,所有内部句柄将会关闭,这个时候如果没有等待事件的话,运行主进程优雅地关闭。

这个方法可以选择添加一个回调参数,当结束时会调用这个回调函数。

这个方法只能由主进程调用。

# 属性

# cluster.isMaster

如果该进程是主进程,则为 true。 这是由 process.env.NODE_UNIQUE_ID 决定的。 如果 process.env.NODE_UNIQUE_ID 未定义,则 isMastertrue

# cluster.isWorker

如果该进程不是主进程,则为 true(与 cluster.isMaster 相反)。

# Worker 类

cluster 模块还提供了一个 Worker 类。

Worker 对象包含了关于工作进程的所有的公共的信息和方法。 在主进程中,可以使用 cluster.workers 来获取它。 在工作进程中,可以使用 cluster.worker 来获取它。

Worker 类,提供了一些事件,这些事件与集群事件类似。但是特定于工作进程。

# 方法

# worker.disconnect()

在一个工作进程内,调用此方法会关闭所有的 server,并等待这些 server'close' 事件执行,然后关闭 IPC 管道。

当一个 server 关闭后,它将不再接收新的连接,但新连接会被其他正在监听的工作进程接收。 已建立的连接可以正常关闭。 当所有连接都关闭后,通往该工作进程的 IPC 管道将会关闭,允许工作进程优雅地死掉,详见 server.close()

# worker.isConnected()

当工作进程通过 IPC 管道连接至主进程时,这个方法返回 true,否则返回 false。 一个工作进程在创建后会自动连接到它的主进程。 当 'disconnect' 事件被触发时才会断开连接。

# worker.isDead()

当工作进程被终止时(包括自动退出或被发送信号),这个方法返回 true。 否则,返回 false

# worker.kill()

这个方法将会杀死工作进程。

# worker.send()

发送消息给工作进程或主进程,可以选择带上句柄。

在主进程中,这会发送消息给特定的工作进程。 相当于 ChildProcess.send()

在工作进程中,这会发送消息给主进程。 相当于 process.send()

# 属性

# worker.id

每一个新衍生的工作进程都会被赋予自己独一无二的编号,这个编号就是储存在 id 里面。

当工作进程还存活时,这个编号可以作为在 cluster.workers 中的索引。

# worker.process

所有的工作进程都是通过 child_process.fork() 来创建的,这个方法返回的对象被存储为 .process。 在工作进程中, process 属于全局对象。

# 获取CPU内核数

通过 os 模块的 cpus 方法,可以获取CPU内核数。

const numCPUs = require('os').cpus().length;

# 参考

更新时间: 6/30/2020, 2:09:36 AM