Nodejs进程间通信

一.场景

Node运行在单线程下,但这并不意味着无法利用多核/多机下多进程的优势

事实上,Node最初从设计上就考虑了分布式网络场景:

Node is a single-threaded, single-process system which enforces shared-nothing design with OS process boundaries. It has rather good libraries for networking. I believe this to be a basis for designing very large distributed programs. The “nodes” need to be organized: given a communication protocol, told how to connect to each other. In the next couple months we are working on libraries for Node that allow these networks.

P.S.关于Node之所以叫Node,见Why is Node.js named Node.js?

二.创建进程

通信方式与进程产生方式有关,而Node有4种创建进程的方式:spawn()exec()execFile()fork()

spawn

const { spawn } = require('child_process');
const child = spawn('pwd');
// 带参数的形式
// const child = spawn('find', ['.', '-type', 'f']);

spawn()返回ChildProcess实例,ChildProcess同样基于事件机制(EventEmitter API),提供了一些事件:

  • exit:子进程退出时触发,可以得知进程退出状态(codesignal

  • disconnect:父进程调用child.disconnect()时触发

  • error:子进程创建失败,或被kill时触发

  • close:子进程的stdio流(标准输入输出流)关闭时触发

  • message:子进程通过process.send()发送消息时触发,父子进程之间可以通过这种内置的消息机制通信

可以通过child.stdinchild.stdoutchild.stderr访问子进程的stdio流,这些流被关闭的时,子进程会触发close事件

P.S.closeexit的区别主要体现在多进程共享同一stdio流的场景,某个进程退出了并不意味着stdio流被关闭了

在子进程中,stdout/stderr具有Readable特性,而stdin具有Writable特性,与主进程的情况正好相反

child.stdout.on('data', (data) => {
  console.log(`child stdout:\n${data}`);
});

child.stderr.on('data', (data) => {
  console.error(`child stderr:\n${data}`);
});

利用进程stdio流的管道特性,就可以完成更复杂的事情,例如:

const { spawn } = require('child_process');

const find = spawn('find', ['.', '-type', 'f']);
const wc = spawn('wc', ['-l']);

find.stdout.pipe(wc.stdin);

wc.stdout.on('data', (data) => {
  console.log(`Number of files ${data}`);
});

作用等价于find . -type f | wc -l,递归统计当前目录文件数量

IPC选项

另外,通过spawn()方法的stdio选项可以建立IPC机制:

const { spawn } = require('child_process');

const child = spawn('node', ['./ipc-child.js'], { stdio: [null, null, null, 'ipc'] });
child.on('message', (m) => {
  console.log(m);
});
child.send('Here Here');

// ./ipc-child.js
process.on('message', (m) => {
  process.send(`< ${m}`);
  process.send('> 不要回答x3');
});

关于spawn()的IPC选项的详细信息,请查看options.stdio

exec

spawn()方法默认不会创建shell去执行传入的命令(所以性能上稍微好一点),而exec()方法会创建一个shell。另外,exec()不是基于stream的,而是把传入命令的执行结果暂存到buffer中,再整个传递给回调函数

exec()方法的特点是完全支持shell语法,可以直接传入任意shell脚本,例如:

const { exec } = require('child_process');

exec('find . -type f | wc -l', (err, stdout, stderr) => {
  if (err) {
    console.error(`exec error: ${err}`);
    return;
  }

  console.log(`Number of files ${stdout}`);
});

exec()方法也因此存在命令注入的安全风险,在含有用户输入等动态内容的场景要特别注意。所以,exec()方法的适用场景是:希望直接使用shell语法,并且预期输出数据量不大(不存在内存压力)

那么,有没有既支持shell语法,还具有stream IO优势的方式?

有。两全其美的方式如下:

const { spawn } = require('child_process');
const child = spawn('find . -type f | wc -l', {
  shell: true
});
child.stdout.pipe(process.stdout);

开启spawn()shell选项,并通过pipe()方法把子进程的标准输出简单地接到当前进程的标准输入上,以便看到命令执行结果。实际上还有更容易的方式:

const { spawn } = require('child_process');
process.stdout.on('data', (data) => {
  console.log(data);
});
const child = spawn('find . -type f | wc -l', {
  shell: true,
  stdio: 'inherit'
});

stdio: 'inherit'允许子进程继承当前进程的标准输入输出(共享stdinstdoutstderr),所以上例能够通过监听当前进程process.stdoutdata事件拿到子进程的输出结果

另外,除了stdioshell选项,spawn()还支持一些其它选项,如:

const child = spawn('find . -type f | wc -l', {
  stdio: 'inherit',
  shell: true,
  // 修改环境变量,默认process.env
  env: { HOME: '/tmp/xxx' },
  // 改变当前工作目录
  cwd: '/tmp',
  // 作为独立进程存在
  detached: true
});

注意env选项除了以环境变量形式向子进程传递数据外,还可以用来实现沙箱式的环境变量隔离,默认把process.env作为子进程的环境变量集,子进程与当前进程一样能够访问所有环境变量,如果像上例中指定自定义对象作为子进程的环境变量集,子进程就无法访问其它环境变量

所以,想要增/删环境变量的话,需要这样做:

var spawn_env = JSON.parse(JSON.stringify(process.env));

// remove those env vars
delete spawn_env.ATOM_SHELL_INTERNAL_RUN_AS_NODE;
delete spawn_env.ELECTRON_RUN_AS_NODE;

var sp = spawn(command, ['.'], {cwd: cwd, env: spawn_env});

detached选项更有意思:

const { spawn } = require('child_process');

const child = spawn('node', ['stuff.js'], {
  detached: true,
  stdio: 'ignore'
});

child.unref();

以这种方式创建的独立进程行为取决于操作系统,Windows上detached子进程将拥有自己的console窗口,而Linux上该进程会创建新的process group(这个特性可以用来管理子进程族,实现类似于tree-kill的特性)

unref()方法用来断绝关系,这样“父”进程可以独立退出(不会导致子进程跟着退出),但要注意这时子进程的stdio也应该独立于“父”进程,否则“父”进程退出后子进程仍会受到影响

execFile

const { execFile } = require('child_process');
const child = execFile('node', ['--version'], (error, stdout, stderr) => {
  if (error) {
    throw error;
  }
  console.log(stdout);
});

exec()方法类似,但不通过shell来执行(所以性能稍好一点),所以要求传入可执行文件。Windows下某些文件无法直接执行,比如.bat.cmd,这些文件就不能用execFile()来执行,只能借助exec()或开启了shell选项的spawn()

P.S.与exec()一样也不是基于stream的,同样存在输出数据量风险

xxxSync

spawnexecexecFile都有对应的同步阻塞版本,一直等到子进程退出

const { 
  spawnSync, 
  execSync, 
  execFileSync,
} = require('child_process');

同步方法用来简化脚本任务,比如启动流程,其它时候应该避免使用这些方法

fork

fork()spawn()的变体,用来创建Node进程,最大的特点是父子进程自带通信机制(IPC管道):

The child_process.fork() method is a special case of child_process.spawn() used specifically to spawn new Node.js processes. Like child_process.spawn(), a ChildProcess object is returned. The returned ChildProcess will have an additional communication channel built-in that allows messages to be passed back and forth between the parent and child. See subprocess.send() for details.

例如:

var n = child_process.fork('./child.js');
n.on('message', function(m) {
  console.log('PARENT got message:', m);
});
n.send({ hello: 'world' });

// ./child.js
process.on('message', function(m) {
  console.log('CHILD got message:', m);
});
process.send({ foo: 'bar' });

因为fork()自带通信机制的优势,尤其适合用来拆分耗时逻辑,例如:

const http = require('http');
const longComputation = () => {
  let sum = 0;
  for (let i = 0; i < 1e9; i++) {
    sum += i;
  };
  return sum;
};
const server = http.createServer();
server.on('request', (req, res) => {
  if (req.url === '/compute') {
    const sum = longComputation();
    return res.end(`Sum is ${sum}`);
  } else {
    res.end('Ok')
  }
});

server.listen(3000);

这样做的致命问题是一旦有人访问/compute,后续请求都无法及时处理,因为事件循环还被longComputation阻塞着,直到耗时计算结束才能恢复服务能力

为了避免耗时操作阻塞主进程的事件循环,可以把longComputation()拆分到子进程中:

// compute.js
const longComputation = () => {
  let sum = 0;
  for (let i = 0; i < 1e9; i++) {
    sum += i;
  };
  return sum;
};

// 开关,收到消息才开始做
process.on('message', (msg) => {
  const sum = longComputation();
  process.send(sum);
});

主进程开启子进程执行longComputation

const http = require('http');
const { fork } = require('child_process');

const server = http.createServer();

server.on('request', (req, res) => {
  if (req.url === '/compute') {
    const compute = fork('compute.js');
    compute.send('start');
    compute.on('message', sum => {
      res.end(`Sum is ${sum}`);
    });
  } else {
    res.end('Ok')
  }
});

server.listen(3000);

主进程的事件循环不会再被耗时计算阻塞,但进程数量还需要进一步限制,否则资源被进程消耗殆尽时服务能力仍会受到影响

P.S.实际上,cluster模块就是对多进程服务能力的封装,思路与这个简单示例类似

三.通信方式

1.通过stdin/stdout传递json

stdin/stdout and a JSON payload

最直接的通信方式,拿到子进程的handle后,可以访问其stdio流,然后约定一种message格式开始愉快地通信:

const { spawn } = require('child_process');

child = spawn('node', ['./stdio-child.js']);
child.stdout.setEncoding('utf8');
// 父进程-发
child.stdin.write(JSON.stringify({
  type: 'handshake',
  payload: '你好吖'
}));
// 父进程-收
child.stdout.on('data', function (chunk) {
  let data = chunk.toString();
  let message = JSON.parse(data);
  console.log(`${message.type} ${message.payload}`);
});

子进程与之类似:

// ./stdio-child.js
// 子进程-收
process.stdin.on('data', (chunk) => {
  let data = chunk.toString();
  let message = JSON.parse(data);
  switch (message.type) {
    case 'handshake':
      // 子进程-发
      process.stdout.write(JSON.stringify({
        type: 'message',
        payload: message.payload + ' : hoho'
      }));
      break;
    default:
      break;
  }
});

P.S.VS Code进程间通信就采用了这种方式,具体见access electron API from vscode extension

明显的限制是需要拿到“子”进程的handle,两个完全独立的进程之间无法通过这种方式来通信(比如跨应用,甚至跨机器的场景)

P.S.关于stream及pipe的详细信息,请查看Node中的流

2.原生IPC支持

spawn()fork()的例子,进程之间可以借助内置的IPC机制通信

父进程:

  • process.on('message')

  • child.send()

子进程:

  • process.on('message')

  • process.send()

限制同上,同样要有一方能够拿到另一方的handle才行

3.sockets

借助网络来完成进程间通信,不仅能跨进程,还能跨机器

node-ipc就采用这种方案,例如:

// server
const ipc=require('../../../node-ipc');

ipc.config.id = 'world';
ipc.config.retry= 1500;
ipc.config.maxConnections=1;

ipc.serveNet(
    function(){
        ipc.server.on(
            'message',
            function(data,socket){
                ipc.log('got a message : ', data);
                ipc.server.emit(
                    socket,
                    'message',
                    data+' world!'
                );
            }
        );

        ipc.server.on(
            'socket.disconnected',
            function(data,socket){
                console.log('DISCONNECTED\n\n',arguments);
            }
        );
    }
);
ipc.server.on(
    'error',
    function(err){
        ipc.log('Got an ERROR!',err);
    }
);
ipc.server.start();

// client
const ipc=require('node-ipc');

ipc.config.id = 'hello';
ipc.config.retry= 1500;

ipc.connectToNet(
    'world',
    function(){
        ipc.of.world.on(
            'connect',
            function(){
                ipc.log('## connected to world ##', ipc.config.delay);
                ipc.of.world.emit(
                    'message',
                    'hello'
                );
            }
        );
        ipc.of.world.on(
            'disconnect',
            function(){
                ipc.log('disconnected from world');
            }
        );
        ipc.of.world.on(
            'message',
            function(data){
                ipc.log('got a message from world : ', data);
            }
        );
    }
);

P.S.更多示例见RIAEvangelist/node-ipc

当然,单机场景下通过网络来完成进程间通信有些浪费性能,但网络通信的优势在于跨环境的兼容性与更进一步的RPC场景

4.message queue

父子进程都通过外部消息机制来通信,跨进程的能力取决于MQ支持

即进程间不直接通信,而是通过中间层(MQ),加一个控制层就能获得更多灵活性和优势:

  • 稳定性:消息机制提供了强大的稳定性保证,比如确认送达(消息回执ACK),失败重发/防止多发等等

  • 优先级控制:允许调整消息响应次序

  • 离线能力:消息可以被缓存

  • 事务性消息处理:把关联消息组合成事务,保证其送达顺序及完整性

P.S.不好实现?包一层能解决吗,不行就包两层……

比较受欢迎的有smrchy/rsmq,例如:

// init
RedisSMQ = require("rsmq");
rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} );
// create queue
rsmq.createQueue({qname:"myqueue"}, function (err, resp) {
    if (resp===1) {
      console.log("queue created")
    }
});
// send message
rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) {
  if (resp) {
    console.log("Message sent. ID:", resp);
  }
});
// receive message
rsmq.receiveMessage({qname:"myqueue"}, function (err, resp) {
  if (resp.id) {
    console.log("Message received.", resp)  
  }
  else {
    console.log("No messages for me...")
  }
});

会起一个Redis server,基本原理如下:

Using a shared Redis server multiple Node.js processes can send / receive messages.

消息的收/发/缓存/持久化依靠Redis提供的能力,在此基础上实现完整的队列机制

5.Redis

基本思路与message queue类似:

Use Redis as a message bus/broker.

Redis自带Pub/Sub机制(即发布-订阅模式),适用于简单的通信场景,比如一对一或一对多并且不关注消息可靠性的场景

另外,Redis有list结构,可以用作消息队列,以此提高消息可靠性。一般做法是生产者LPUSH消息,消费者BRPOP消息。适用于要求消息可靠性的简单通信场景,但缺点是消息不具状态,且没有ACK机制,无法满足复杂的通信需求

P.S.Redis的Pub/Sub示例见What’s the most efficient node.js inter-process communication library/method?

四.总结

Node进程间通信有4种方式:

  • 通过stdin/stdout传递json:最直接的方式,适用于能够拿到“子”进程handle的场景,适用于关联进程之间通信,无法跨机器

  • Node原生IPC支持:最native(地道?)的方式,比上一种“正规”一些,具有同样的局限性

  • 通过sockets:最通用的方式,有良好的跨环境能力,但存在网络的性能损耗

  • 借助message queue:最强大的方式,既然要通信,场景还复杂,不妨扩展出一层消息中间件,漂亮地解决各种通信问题

参考资料

Nodejs进程间通信》上有1条评论

  1. Troy Diao

    三、通信方式 2.原生IPC支持

    父进程应该是 child.on(‘message’)收

    回复

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*

code