写在前面
异步控制是理解Node的深水区,当然,如果只是要搞定异步编程消除回调金字塔的话,会用async
模块就足够应付一切了
Node异步流程控制专题希望贴近各个异步控制库(EventProxy
、Step
、Wind
、async
以及ES6 Promise
和ES7 async/await
),近距离了解其实现,而不只是使用它们。其实还有一个目的是:向Wind致敬
零.EventEmitter
Node的主旋律是异步与事件,许多核心模块都是基于events
模块实现的,因而出现了复杂的异步场景和回调金字塔。所以,先看根源。示例如下:
var EventEmitter = require('events');
var util = require('util');
function MyEmitter() {
EventEmitter.call(this);
}
// 通过继承机制创建自定义EventEmitter
util.inherits(MyEmitter, EventEmitter);
// 创建实例
var myEmitter = new MyEmitter();
// 添加事件监听器
myEmitter.on('myEvent', function(arg1, arg2) {
console.log('myEvent occurs');
console.log(arg1, arg2, this);
// 1 3 { domain: null,
// _events: { myEvent: [Function] },
// _maxListeners: undefined }
});
// 触发事件
myEmitter.emit('myEvent', 1, 3);
// 添加一次性事件监听器
myEmitter.once('onceEvent', function() {
console.log(myEmitter.listenerCount('onceEvent'));
console.log(EventEmitter.defaultMaxListeners);
console.log(MyEmitter.defaultMaxListeners);
console.log(myEmitter.listenerCount('onceEvent'));
console.log(myEmitter.getMaxListeners());
});
// console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent')); // 废弃
console.log(myEmitter.listenerCount('onceEvent'));
// 添加监听器,关注监听被移除的事件(验证once)
myEmitter.on('removeListener', function(event, listener) {
console.log(event);
console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));
});
myEmitter.emit('onceEvent');
P.S.其中util
模块提供的继承机制是寄生组合式继承
一.异步控制的关键
任何异步控制库或者方法都要解决2个问题:
业务支持:处理异步多级依赖,处理异步请求同时发出以及回调顺序有依赖的情况
异常处理:1.必须执行且只执行一次回调函数 2.正确回传异常供调用者判断
在为业务提供支持,尽可能利用并发优势的同时,提供友好的异常控制方式
此外,并发控制也是需要关注的问题,避免异步请求因并发而大量堆积,在确保系统稳定的前提下充分利用并发优势
比如async
模块大而全,把这3个问题都解决了,而EventProxy
模块解决了前两个问题,并发控制由BagPipe
模块负责。Step
、Wind
等模块也没有考虑并发控制的问题
二.模拟EventProxy
EventProxy
虽然没有依赖内置的events
模块,但其实现确实是基于事件的(事件订阅/发布模式)。因此,我们利用内置的events
模块来模拟实现EventProxy
P.S.自定义事件机制不难实现,也不是本文的关注点,感兴趣可以查看JS学习笔记11_高级技巧 8.观察者模式
1.原理
提供一些接口接收外部传入的异步任务,内部管理这些任务的执行流程(顺序/并发/依赖),并收集结果,最后把结果传出去。此外,还需要管理任务执行过程中的异常
比如EventProxy
提供的all
、tail
、after
是3种任务管理方式,而fail
和done
负责处理异常
2.结构
首先自定义EventEmitter
,并进行简单封装,如下:
var EventEmitter = require('events');
var util = require('util');
function MyEmitter() {
EventEmitter.call(this);
}
util.inherits(MyEmitter, EventEmitter);
// event proxy
var EP = function() {
this.emitter = new MyEmitter();
};
EP.prototype.emit = function(event, data) {
this.emitter.emit(event, event, data);
};
EP.prototype.on = function(event, callback) {
this.emitter.on(event, (ev, data) => {
if (ev === event) {
callback(data);
}
});
};
自定义EventEmitter
是为了便于扩展事件机制(虽然这里体现不出来),把emitter
包进EP
并提供基础接口on/emit
,事件机制就基本完整了
3.核心部分
接下来要提供各种异步控制方式(任务管理方式),例如,all
:
// 所有依赖事件都触发后,执行回调
EP.prototype.all = function() {
// 分解参数
var args = args2arr(arguments);
var callback = args.pop(); // 最后一个参数是callback
var times = args.length; // 其余参数是事件名(回调函数的形参名)
var _data = [];
var _callback = after(times, data => {
for (var event of args) {
// 按顺序排列实参
_data.push(data[event]);
// 解绑事件
//! this指向EP实例,因为在箭头函数中
this.emitter.removeListener(event, _callback);
}
callback.apply(null, _data);
});
for (var event of args) {
this.emitter.on(event, _callback);
}
};
其中用到的工具函数为:
// utils
// after返回的函数在times次调用后才会真正执行fn
var after = function(times, fn) {
var data = {};
if (times <= 0) {
return fn();
}
return function(key, val) {
times--;
if (data.hasOwnProperty(key)) {
if (!Array.isArray(data[key])) {
data[key] = [data[key]];
}
data[key].push(val);
}
else {
data[key] = val;
}
if (times === 0) {
return fn(data);
}
};
};
var args2arr = function(args) {
return Array.prototype.slice.call(args);
};
模拟的all
与EventProxy
的all
功能完全一致(不考虑异常的话),用法如下:
var asyncTask = function(name, delay, fn) {
setTimeout(function() {
console.log('get ' + name + ' at ' + new Date().getTime());
if (typeof fn === 'function') {
fn();
}
}, delay);
};
// 利用自定义EP实现
var EP = require('./ep.js');
var ep = new EP();
var task = function(res, data) {
data = data || 'this is ' + res;
return function() {
ep.emit(res, data);
};
};
//- all
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
asyncTask('res2', 350, task('res2'));
asyncTask('res3', 280, task('res3'));
// 结束之后再次触发res1事件
asyncTask('res1', 400, task('res1'));
使用的秘密在于task
中的ep.emit(res, data)
,先通过all
告知EP
记录将要执行的异步任务以及大回调函数,然后外部每执行完毕一个异步任务,都通过ep.emit(res, data)
通知EP
,所有任务执行完毕时,EP
内部调用all
当初记录的大回调函数
类似的,可以实现tail
和after
:
// 与all类似,但能更新数据执行后续回调
EP.prototype.tail = function() {
// 分解参数
var args = args2arr(arguments);
var callback = args.pop(); // 最后一个参数是callback
var times = args.length; // 其余参数是事件名(回调函数的形参名)
var _data = [];
var _callback = after(times, data => {
for (var event of args) {
// 按顺序排列实参
_data.push(data[event]);
// 解绑事件
//! this指向EP实例,因为在箭头函数中
this.emitter.removeListener(event, _callback);
}
callback.apply(null, _data);
// 绑定后续回调
var tailData = _data.slice();
var tailCallback = function(key, val) {
for (var i = 0; i < args.length; i++) {
if (key === args[i]) {
tailData[i] = val;
break;
}
}
callback.apply(null, tailData);
};
for (var event of args) {
this.emitter.on(event, tailCallback);
}
});
for (var event of args) {
this.emitter.on(event, _callback);
}
};
// 多次调用同一接口,最后返回数组
EP.prototype.after = function(event, times, callback) {
var _callback = after(times, data => {
if (Array.isArray(data[event])) {
// 解绑事件
this.emitter.removeListener(event, _callback);
callback.call(null, data[event].slice());
}
});
this.emitter.on(event, _callback);
};
tail
只在all
的基础上做了一点点改动,在每次新数据到来时用新数据执行旧回调函数
after
非常简单,只是对after
工具函数的简单应用
4.异常处理
异常处理规则比较简单:一旦发生异常,就卸载所有处理函数并调用error
事件监听器。如下:
// 异常处理
EP.prototype.fail = function(callback) {
this.emitter.on('error', (err) => {
// 卸载所有处理函数
this.emitter.removeAllListeners();
// 执行异常回调
callback(err);
});
};
EP.prototype.done = function(event) {
return (err, result) => {
if (err) {
// 异常统一交由error事件处理
return this.emitter.emit('error', err);
}
this.emitter.emit(event, result);
};
};
done
是之前task
中ep.emit(res, data)
的翻版,加入了异常处理
done
其实是EventProxy
中比较精巧的部分,避免了if (err) {...}
这样繁琐的异常处理操作,简化了业务代码,同时隐藏了异常处理,用起来更清爽也更安全
至此,模拟EventProxy
结束,它提供的其它异步控制方式的实现与all
、after
类似,只是更复杂的控制方式可能需要更多的代码,如果继续实现下去的话,最终结果就是类似于async
模块的异步控制方式大全,此处不再深究
三.EventProxy到底做了什么
EventProxy
其实没做什么,它最大的特点就是侵入性小,像插件一样
展开之前使用模拟的all
的例子,如下:
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
// 展开asyncTask('res1', 300, task('res1'))
// asyncTask
setTimeout(function() {
console.log('get ' + 'res1' + ' at ' + new Date().getTime());
// task('res1')
var data = 'this is ' + 'res1';
ep.emit('res1', data);
}, 300);
可以看到,使用EP的过程是大片业务代码中偶尔穿插1句EP代码(ep.all
和ep.emit
),对业务块本身几乎没有影响(只是可能要在业务块出口插入一条EP代码),更不用重构业务代码去迎合框架,对比Promise
的话,这一点很明显
回到问题,EventProxy
到底做了什么?
EventProxy
简化了用事件机制管理异步任务的过程。用内置的events
模块也很容易实现类似的功能,而且代码量不会比EventProxy
源码多很多。对业务代码的侵入性很小,因此更像是util
,而不是大只的框架
四.总结
EventProxy
是一个精巧的工具库(侵入性小),提供了基于事件机制的异步控制方法
对比async
模块,EventProxy
用起来更麻烦,功能也不够全面,但其小巧与灵活性是亮点,插件式的工具,可以随时选择用或者不用,而对于稍微“强势”一点的框架,弃用需要勇气
对比Promise
,事件订阅/发布机制(EventProxy
)的缺点是必须预先确定分支,否则事件发生后再指定分支就无效了。Promise
最大的特点是:
分离了正向用例和反向用例(
p.then(onFulfilled, onRejected)
)延迟逻辑处理
延迟逻辑处理,即不用预先指定分支,先执行异步调用,延迟指定分支处理。但Promise
的缺点是要为不同的场景封装不同的API,存在包装成本
P.S.关于Promise
的详细信息,请查看完全理解Promise
参考资料
- 《深入浅出NodeJS》