如何實(shí)現(xiàn)Nodejs進(jìn)程間通信
本文轉(zhuǎn)載自微信公眾號(hào)「編程雜技」,作者theanarkh 。轉(zhuǎn)載本文請(qǐng)聯(lián)系編程雜技公眾號(hào)。
對(duì)于有繼承關(guān)系的進(jìn)程,nodejs本身為我們提供了進(jìn)程間通信的方式,但是對(duì)于沒有繼承關(guān)系的進(jìn)程,比如兄弟進(jìn)程,想要通信最簡(jiǎn)單的方式就是通過主進(jìn)程中轉(zhuǎn),類似前端框架中子組件通過更新父組件的數(shù)據(jù),然后父通知其他子組件。因?yàn)閚odejs內(nèi)置的進(jìn)程間通信需要經(jīng)過序列化和反序列化,所以這種方式可能會(huì)帶來一定的性能損耗,而且在實(shí)現(xiàn)上也比較麻煩。今天介紹的是實(shí)現(xiàn)兄弟進(jìn)程通信的另外一種方式,在windows上使用命名管道,在非windows上使用unix域,另外本文還會(huì)介紹基于tcp的遠(yuǎn)程進(jìn)程通信的實(shí)現(xiàn)。下面具體介紹一下設(shè)計(jì)和實(shí)現(xiàn)。
1 IPC的實(shí)現(xiàn)
ipc的實(shí)現(xiàn)比較簡(jiǎn)單,主要是對(duì)nodejs提供的功能進(jìn)行封裝。首先我們需要處理一下path,因?yàn)樵诿艿篮蛈nix域中他的格式是不一樣的。
- const os = require('os');
 - module.exports = {
 - path: os.platform() === 'win32' ? '\\\\?\\pipe\\ipc' : '/tmp/unix.sock',
 - };
 
接著我們看看客戶端和服務(wù)器的實(shí)現(xiàn)。
1.1 IPCClient的實(shí)現(xiàn)
- const net = require('net');
 - const { EventEmitter } = require('events');
 - const { path } = require('../config');
 - class Client extends EventEmitter {
 - constructor(options) {
 - super();
 - this.options = { path, ...options };
 - const socket = net.connect(this.options);
 - socket.on('error', (error) => {
 - console.error(error);
 - });
 - return socket;
 - }
 - }
 - module.exports = {
 - Client,
 - };
 
1.2 IPCServer的實(shí)現(xiàn)
- const fs = require('fs');
 - const net = require('net');
 - const { EventEmitter } = require('events');
 - const { path } = require('../config');
 - class Server extends EventEmitter {
 - constructor(options, connectionListener) {
 - super();
 - if (typeof options === 'function') {
 - options = {
 - connectionListener: options,
 - };
 - } else {
 - options = { ...options, connectionListener };
 - }
 - try {
 - fs.existsSync(options.path) && fs.unlinkSync(options.path);
 - } catch(e) {
 - }
 - this.options = { path, ...options };
 - return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => {
 - client.on('error', (error) => {
 - console.error(error);
 - });
 - typeof this.options.connectionListener === 'function' && this.options.connectionListener(client);
 - }).listen(this.options);
 - }
 - }
 - module.exports = {
 - Server,
 - };
 
2 RPC的實(shí)現(xiàn)
我們知道tcp是面向流的服務(wù),他本身只負(fù)責(zé)傳輸數(shù)據(jù),不負(fù)責(zé)數(shù)據(jù)的解析和解釋。通過tcp傳輸數(shù)據(jù)時(shí),需要自己解析數(shù)據(jù),我們需要從一串字節(jié)流中解析出一個(gè)個(gè)數(shù)據(jù)包。這就涉及到協(xié)議的設(shè)計(jì)。所以首先我們要定義一個(gè)應(yīng)用層協(xié)議。
2.1 應(yīng)用層協(xié)議的設(shè)計(jì)和實(shí)現(xiàn)
null應(yīng)用層協(xié)議的設(shè)計(jì)非常簡(jiǎn)單
1 總長(zhǎng)度是除了開頭標(biāo)記之外的其他數(shù)據(jù)長(zhǎng)度。因?yàn)閿?shù)據(jù)部分是變長(zhǎng)的,所以我們需要一個(gè)總長(zhǎng)度來判斷后續(xù)的數(shù)據(jù)長(zhǎng)度是多少。
2 序列號(hào)是用于關(guān)聯(lián)請(qǐng)求和響應(yīng),因?yàn)槲覀冊(cè)谝粋€(gè)連接上可能會(huì)串行發(fā)送多個(gè)數(shù)據(jù)包,當(dāng)我們收到一個(gè)回包的時(shí)候,我們不知道是來自哪個(gè)請(qǐng)求的響應(yīng),通過響應(yīng)體中的seq,我們就知道是來自哪個(gè)請(qǐng)求的響應(yīng)。設(shè)計(jì)了通信協(xié)議后,我們就需要對(duì)協(xié)議進(jìn)行封包解包。首先我們看一下封包邏輯。
- function seq() {
 - return ~~(Math.random() * Math.pow(2, 31))
 - }
 - function packet(data, sequnce) {
 - // 轉(zhuǎn)成buffer
 - const bufferData = Buffer.from(data, 'utf-8');
 - // 開始標(biāo)記長(zhǎng)度
 - const startFlagLength = Buffer.from([PACKET_START]).byteLength;
 - // 序列號(hào)
 - const _seq = sequnce || seq();
 - // 分配一個(gè)buffer存儲(chǔ)數(shù)據(jù)
 - let buffer = Buffer.allocUnsafe(startFlagLength + TOTAL_LENGTH + SEQ_LEN);
 - // 設(shè)計(jì)開始標(biāo)記
 - buffer[0] = 0x3;
 - // 寫入總長(zhǎng)度字段的值
 - buffer.writeUIntBE(TOTAL_LENGTH + SEQ_LEN + bufferData.byteLength, 1, TOTAL_LENGTH);
 - // 寫入序列號(hào)的值
 - buffer.writeUIntBE(_seq, startFlagLength + TOTAL_LENGTH, SEQ_LEN);
 - // 把協(xié)議元數(shù)據(jù)和數(shù)據(jù)組裝到一起
 - buffer = Buffer.concat([buffer, bufferData], buffer.byteLength + bufferData.byteLength);
 - return buffer;
 - }
 
接著我們看一下解包的邏輯,因?yàn)閿?shù)據(jù)的傳輸是字節(jié)流,所以有可能多個(gè)數(shù)據(jù)包的數(shù)據(jù)會(huì)粘在一起,所以我們第一步首先要根據(jù)協(xié)議解析出一個(gè)個(gè)數(shù)據(jù)包,然后再解析每一個(gè)數(shù)據(jù)包。我們通過有限狀態(tài)機(jī)實(shí)現(xiàn)數(shù)據(jù)的解析。下面是狀態(tài)機(jī)的狀態(tài)集。
- const PARSE_STATE = {
 - PARSE_INIT: 0,
 - PARSE_HEADER: 1,
 - PARSE_DATA: 2,
 - PARSE_END: 3,
 - };
 
接著我們定義狀態(tài)集的轉(zhuǎn)換規(guī)則。
- class StateSwitcher {
 - constructor(options) {
 - this.options = options;
 - }
 - [PARSE_STATE.PARSE_INIT](data) {
 - // 數(shù)據(jù)不符合預(yù)期
 - if (data[0] !== PACKET_START) {
 - // 跳過部分?jǐn)?shù)據(jù),找到開始標(biāo)記
 - const position = data.indexOf(PACKET_START);
 - // 沒有開始標(biāo)記,說明這部分?jǐn)?shù)據(jù)無效,丟棄
 - if (position === -1) {
 - return [NEED_MORE_DATA, null];
 - }
 - // 否則返回有效數(shù)據(jù)部分,繼續(xù)解析
 - return [PARSE_STATE.PACKET_START, data.slice(position)];
 - }
 - // 保存當(dāng)前正在解析的數(shù)據(jù)包
 - this.packet = new Packet();
 - // 跳過開始標(biāo)記的字節(jié)數(shù),進(jìn)入解析協(xié)議頭階段
 - return [PARSE_STATE.PARSE_HEADER, data.slice(Buffer.from([PACKET_START]).byteLength)];
 - }
 - [PARSE_STATE.PARSE_HEADER](data) {
 - // 數(shù)據(jù)不夠頭部的大小則等待數(shù)據(jù)到來
 - if (data.length < TOTAL_LENGTH + SEQ_LEN) {
 - return [NEED_MORE_DATA, data];
 - }
 - // 有效數(shù)據(jù)包的長(zhǎng)度 = 整個(gè)數(shù)據(jù)包長(zhǎng)度 - 頭部長(zhǎng)度
 - this.packet.set('length', data.readUInt32BE() - (TOTAL_LENGTH + SEQ_LEN));
 - // 序列號(hào)
 - this.packet.set('seq', data.readUInt32BE(TOTAL_LENGTH));
 - // 解析完頭部了,跳過去
 - data = data.slice(TOTAL_LENGTH + SEQ_LEN);
 - // 進(jìn)入解析數(shù)據(jù)階段
 - return [PARSE_STATE.PARSE_DATA, data];
 - }
 - [PARSE_STATE.PARSE_DATA](data) {
 - const len = this.packet.get('length');
 - // 數(shù)據(jù)部分的長(zhǎng)度小于協(xié)議頭中定義的長(zhǎng)度,則繼續(xù)等待
 - if (data.length < len) {
 - return [NEED_MORE_DATA, data];
 - }
 - // 截取數(shù)據(jù)部分
 - this.packet.set('data', data.slice(0, len));
 - // 解析完數(shù)據(jù)了,完成一個(gè)包的解析,跳過數(shù)據(jù)部分
 - data = data.slice(len);
 - typeof this.options.cb === 'function' && this.options.cb(this.packet);
 - this.packet = null;
 - // 解析完一個(gè)數(shù)據(jù)包,進(jìn)入結(jié)束標(biāo)記階段
 - return [PARSE_STATE.PARSE_INIT, data];
 - }
 - }
 
我們?cè)倏匆幌聽顟B(tài)機(jī)的實(shí)現(xiàn)
- class FSM {
 - constructor(options) {
 - this.options = options;
 - // 狀態(tài)處理機(jī),定義了狀態(tài)轉(zhuǎn)移集合
 - this.stateSwitcher = new StateSwitcher({cb: options.cb});
 - // 當(dāng)前狀態(tài)
 - this.state = PARSE_STATE.PARSE_INIT;
 - // 結(jié)束狀態(tài)
 - this.endState = PARSE_STATE.PARSE_END;
 - // 當(dāng)前待解析的數(shù)據(jù)
 - this.buffer = null;
 - }
 - run(data) {
 - // 沒有數(shù)據(jù)或者解析結(jié)束了直接返回
 - if (this.state === this.endState || !data || !data.length) {
 - return;
 - }
 - // 保存待解析的數(shù)據(jù)
 - this.buffer = this.buffer ? Buffer.concat([this.buffer, data]) : data;
 - // 還沒結(jié)束,并且還有數(shù)據(jù)可以處理則繼續(xù)執(zhí)行
 - while(this.state !== this.endState && this.buffer && this.buffer.length) {
 - // 執(zhí)行狀態(tài)處理函數(shù),返回[下一個(gè)狀態(tài), 剩下的數(shù)據(jù)]
 - const result = this.stateSwitcher[this.state](this.buffer);
 - // 如果下一個(gè)狀態(tài)是NEED_MORE_DATA則說明需要更多的數(shù)據(jù)才能繼續(xù)解析,并保持當(dāng)前狀態(tài)
 - if (result[0] === NEED_MORE_DATA) {
 - return;
 - }
 - // 記錄下一個(gè)狀態(tài)和數(shù)據(jù)
 - [this.state, this.buffer] = result;
 - }
 - }
 - }
 
狀態(tài)機(jī)就是對(duì)開始狀態(tài)、結(jié)束狀態(tài)、狀態(tài)轉(zhuǎn)換集的封裝。實(shí)現(xiàn)了協(xié)議的封包和解析后我們看一下如何使用。
2.2 RPC客戶端實(shí)現(xiàn)
- const net = require('net');
 - const { EventEmitter } = require('events');
 - const { FSM } = require('tiny-application-layer-protocol');
 - class Client extends EventEmitter {
 - constructor(options) {
 - super();
 - this.options = { ...options };
 - const socket = net.connect(this.options);
 - socket.on('error', (error) => {
 - console.error(error);
 - });
 - const fsm = new FSM({
 - cb: (packet) => {
 - socket.emit('message', packet);
 - }
 - });
 - socket.on('data', fsm.run.bind(fsm));
 - return socket;
 - }
 - }
 - module.exports = {
 - Client,
 - };
 
我們做的事情主要是負(fù)責(zé)數(shù)據(jù)的解析。
2.3 RPC服務(wù)器實(shí)現(xiàn)
- const fs = require('fs');
 - const net = require('net');
 - const { EventEmitter } = require('events')
 - const { FSM } = require('tiny-application-layer-protocol');
 - class Server extends EventEmitter {
 - constructor(options, connectionListener) {
 - super();
 - if (typeof options === 'function') {
 - options = {
 - connectionListener: options,
 - };
 - } else {
 - options = { ...options, connectionListener };
 - }
 - this.options = { ...options };
 - return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => {
 - const fsm = new FSM({
 - cb: function(packet) {
 - client.emit('message', packet);
 - }
 - })
 - client.on('data', fsm.run.bind(fsm));
 - client.on('error', (error) => {
 - console.error(error);
 - });
 - typeof this.options.connectionListener === 'function' && this.options.connectionListener(client);
 - }).listen(this.options);
 - }
 - }
 - module.exports = {
 - Server,
 - };
 
同樣,服務(wù)器也是負(fù)責(zé)數(shù)據(jù)的解析
3 使用
接下來我們看一下如何使用。
3.1 ipc的使用
server.js
- const { IPCServer } = require('../../src');
 - const { packet } = require('tiny-application-layer-protocol');
 - new IPCServer(function(client) {
 - console.log(1)
 - client.on('data', (data) => {
 - console.log('receive', data);
 - client.write(packet('world', data.seq));
 - });
 - });
 
client.js
- const { IPCClient } = require('../../src');
 - const { packet, seq } = require('tiny-application-layer-protocol');
 - const client = new IPCClient();
 - client.write(packet('hello', seq()));
 - client.on('data', function(res) {
 - console.log('receive', res);
 - })
 
服務(wù)器輸出
客戶端輸出
3.2 RPC的使用
server.js
- const { RPCServer } = require('../../src');
 - const { packet } = require('tiny-application-layer-protocol');
 - new RPCServer({host: '127.0.0.1', port: 80}, function(client) {
 - client.on('message', (data) => {
 - console.log('receive', data);
 - client.write(packet('world', data.seq));
 - });
 - });
 
client.js
- const { RPCClient } = require('../../src');
 - const { packet, seq } = require('tiny-application-layer-protocol');
 - const client = new RPCClient({host: '127.0.0.1', port: 80});
 - client.write(packet('hello', seq()));
 - client.on('message', function(res) {
 - console.log('receive', res);
 - })
 
服務(wù)器輸出
客戶端輸出
4 RPC拓展
我們實(shí)現(xiàn)了數(shù)據(jù)的傳輸和解析,但是如何我們希望數(shù)據(jù)的請(qǐng)求和響應(yīng)是一一對(duì)應(yīng)的怎么辦呢?比如像http在tcp上可以并發(fā)發(fā)起多個(gè)請(qǐng)求一樣,響應(yīng)是否可以亂序返回,我們又如何知道某個(gè)響應(yīng)對(duì)應(yīng)的是哪個(gè)請(qǐng)求?接下來介紹如何解決這個(gè)問題。首先我們實(shí)現(xiàn)一個(gè)請(qǐng)求管理的類。
- class RequestManager {
 - constructor(options) {
 - this.options = { timeout: 10000, ...options };
 - this.map = {};
 - this.timerId = null;
 - this.startPollTimeout();
 - }
 - set(key, context) {
 - if (typeof context.cb !== 'function') {
 - throw new Error('cb is required');
 - }
 - this.map[key] = {
 - startTime: Date.now(),
 - ...context,
 - };
 - }
 - get(key) {
 - return this.map[key];
 - }
 - del(key) {
 - return delete this.map[key];
 - }
 - // 執(zhí)行上下文
 - exec(key, data) {
 - const context = this.get(key);
 - if (context) {
 - this.del(key);
 - context.cb(data);
 - }
 - }
 - execAll(data) {
 - for (const [key] of Object.entries(this.map)) {
 - this.exec(key, data);
 - }
 - }
 - // 定時(shí)輪詢是否超時(shí)
 - startPollTimeout() {
 - this.timerId = setTimeout(() => {
 - if (!this.timerId) {
 - return;
 - }
 - const nextMap = {};
 - for (const [key, context] of Object.entries(this.map)) {
 - if (Date.now() - context.startTime < (context.timeout || this.options.timeout)) {
 - nextMap[key] = context;
 - } else {
 - context.cb(new Error('timeout'));
 - }
 - }
 - this.map = nextMap;
 - this.startPollTimeout();
 - }, 1000);
 - }
 - }
 
該類的邏輯主要是請(qǐng)求的seq保存對(duì)應(yīng)的上下文,然后收到響應(yīng)的時(shí)候,我們根據(jù)響應(yīng)的seq拿到對(duì)應(yīng)的上下文,從而執(zhí)行對(duì)應(yīng)的回調(diào)。我們看看如何使用該類。
server.js
- const { RPCServer } = require('../../src');
 - const { packet } = require('tiny-application-layer-protocol');
 - new RPCServer({host: '127.0.0.1', port: 80}, function(client) {
 - client.on('message', (data) => {
 - console.log('receive', data);
 - client.end(packet('world', data.seq));
 - });
 - client.on('end', (data) => {
 - client.end();
 - });
 - });
 
client.js
- const { RPCClient, RequestManager } = require('../../src');
 - const { packet, seq } = require('tiny-application-layer-protocol');
 - const requestManager = new RequestManager({timeout: 3000});
 - const client = new RPCClient({host: '127.0.0.1', port: 80});
 - const _seq = seq();
 - requestManager.set(_seq, {
 - cb: function() {
 - console.log(...arguments);
 - }
 - })
 - client.write(packet('hello', _seq));
 - client.on('message', function(packet) {
 - requestManager.exec(packet.seq, packet);
 - })
 
輸出 服務(wù)器輸出
客戶端輸出null
github倉庫:https://github.com/theanarkh/nodejs-ipc
github倉庫:https://github.com/theanarkh/tiny-application-layer-protocol
npm install nodejs-i-p-c(ipc和rpc庫,依賴tiny-application-layer-protocol)
npm install tiny-application-layer-protocol(基于tcp的小型應(yīng)用層協(xié)議,包含協(xié)議的定義、封包、解包功能)






















 
 
 









 
 
 
 