模拟EventProxy_Node异步流程控制1

写在前面

异步控制是理解Node的深水区,当然,如果只是要搞定异步编程消除回调金字塔的话,会用async模块就足够应付一切了

Node异步流程控制专题希望贴近各个异步控制库(EventProxyStepWindasync以及ES6 PromiseES7 async/await),近距离了解其实现,而不只是使用它们。其实还有一个目的是:向Wind致敬

零.EventEmitter

Node的主旋律是异步与事件,许多核心模块都是基于events模块实现的,因而出现了复杂的异步场景和回调金字塔。所以,先看根源。示例如下:

var EventEmitter = require('events');
var util = require('util');

function MyEmitter() {
    EventEmitter.call(this);
}
// 通过继承机制创建自定义EventEmitter
util.inherits(MyEmitter, EventEmitter);

// 创建实例
var myEmitter = new MyEmitter();
// 添加事件监听器
myEmitter.on('myEvent', function(arg1, arg2) {
    console.log('myEvent occurs');
    console.log(arg1, arg2, this);
    // 1 3 { domain: null,
    //   _events: { myEvent: [Function] },
    //   _maxListeners: undefined }
});
// 触发事件
myEmitter.emit('myEvent', 1, 3);

// 添加一次性事件监听器
myEmitter.once('onceEvent', function() {
    console.log(myEmitter.listenerCount('onceEvent'));
    console.log(EventEmitter.defaultMaxListeners);
    console.log(MyEmitter.defaultMaxListeners);
    console.log(myEmitter.listenerCount('onceEvent'));
    console.log(myEmitter.getMaxListeners());
});
// console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));    // 废弃
console.log(myEmitter.listenerCount('onceEvent'));
// 添加监听器,关注监听被移除的事件(验证once)
myEmitter.on('removeListener', function(event, listener) {
    console.log(event);
    console.log(EventEmitter.listenerCount(myEmitter, 'onceEvent'));
});
myEmitter.emit('onceEvent');

P.S.其中util模块提供的继承机制是寄生组合式继承

一.异步控制的关键

任何异步控制库或者方法都要解决2个问题:

  • 业务支持:处理异步多级依赖,处理异步请求同时发出以及回调顺序有依赖的情况

  • 异常处理:1.必须执行且只执行一次回调函数 2.正确回传异常供调用者判断

在为业务提供支持,尽可能利用并发优势的同时,提供友好的异常控制方式

此外,并发控制也是需要关注的问题,避免异步请求因并发而大量堆积,在确保系统稳定的前提下充分利用并发优势

比如async模块大而全,把这3个问题都解决了,而EventProxy模块解决了前两个问题,并发控制由BagPipe模块负责。StepWind等模块也没有考虑并发控制的问题

二.模拟EventProxy

EventProxy虽然没有依赖内置的events模块,但其实现确实是基于事件的(事件订阅/发布模式)。因此,我们利用内置的events模块来模拟实现EventProxy

P.S.自定义事件机制不难实现,也不是本文的关注点,感兴趣可以查看JS学习笔记11_高级技巧 8.观察者模式

1.原理

提供一些接口接收外部传入的异步任务,内部管理这些任务的执行流程(顺序/并发/依赖),并收集结果,最后把结果传出去。此外,还需要管理任务执行过程中的异常

比如EventProxy提供的alltailafter是3种任务管理方式,而faildone负责处理异常

2.结构

首先自定义EventEmitter,并进行简单封装,如下:

var EventEmitter = require('events');
var util = require('util');

function MyEmitter() {
    EventEmitter.call(this);
}
util.inherits(MyEmitter, EventEmitter);

// event proxy
var EP = function() {
    this.emitter = new MyEmitter();
};
EP.prototype.emit = function(event, data) {
    this.emitter.emit(event, event, data);
};
EP.prototype.on = function(event, callback) {
    this.emitter.on(event, (ev, data) => {
        if (ev === event) {
            callback(data);
        }
    });
};

自定义EventEmitter是为了便于扩展事件机制(虽然这里体现不出来),把emitter包进EP并提供基础接口on/emit,事件机制就基本完整了

3.核心部分

接下来要提供各种异步控制方式(任务管理方式),例如,all

// 所有依赖事件都触发后,执行回调
EP.prototype.all = function() {
    // 分解参数
    var args = args2arr(arguments);
    var callback = args.pop();  // 最后一个参数是callback
    var times = args.length;    // 其余参数是事件名(回调函数的形参名)

    var _data = [];

    var _callback = after(times, data => {
        for (var event of args) {
            // 按顺序排列实参
            _data.push(data[event]);
            // 解绑事件
            //! this指向EP实例,因为在箭头函数中
            this.emitter.removeListener(event, _callback);
        }
        callback.apply(null, _data);
    });
    for (var event of args) {
        this.emitter.on(event, _callback);
    }
};

其中用到的工具函数为:

// utils
// after返回的函数在times次调用后才会真正执行fn
var after = function(times, fn) {
    var data = {};

    if (times <= 0) {
        return fn();
    }
    return function(key, val) {
        times--;
        if (data.hasOwnProperty(key)) {
            if (!Array.isArray(data[key])) {
                data[key] = [data[key]];
            }
            data[key].push(val);
        }
        else {
            data[key] = val;
        }

        if (times === 0) {
            return fn(data);
        }
    };
};
var args2arr = function(args) {
    return Array.prototype.slice.call(args);
};

模拟的allEventProxyall功能完全一致(不考虑异常的话),用法如下:

var asyncTask = function(name, delay, fn) {
    setTimeout(function() {
        console.log('get ' + name + ' at ' + new Date().getTime());
        if (typeof fn === 'function') {
            fn();
        }
    }, delay);
};

// 利用自定义EP实现
var EP = require('./ep.js');
var ep = new EP();

var task = function(res, data) {
    data = data || 'this is ' + res;
    return function() {
        ep.emit(res, data);
    };
};

//- all
ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
    console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));
asyncTask('res2', 350, task('res2'));
asyncTask('res3', 280, task('res3'));
// 结束之后再次触发res1事件
asyncTask('res1', 400, task('res1'));

使用的秘密在于task中的ep.emit(res, data),先通过all告知EP记录将要执行的异步任务以及大回调函数,然后外部每执行完毕一个异步任务,都通过ep.emit(res, data)通知EP,所有任务执行完毕时,EP内部调用all当初记录的大回调函数

类似的,可以实现tailafter

// 与all类似,但能更新数据执行后续回调
EP.prototype.tail = function() {
    // 分解参数
    var args = args2arr(arguments);
    var callback = args.pop();  // 最后一个参数是callback
    var times = args.length;    // 其余参数是事件名(回调函数的形参名)

    var _data = [];

    var _callback = after(times, data => {
        for (var event of args) {
            // 按顺序排列实参
            _data.push(data[event]);
            // 解绑事件
            //! this指向EP实例,因为在箭头函数中
            this.emitter.removeListener(event, _callback);
        }
        callback.apply(null, _data);
        // 绑定后续回调
        var tailData = _data.slice();
        var tailCallback = function(key, val) {
            for (var i = 0; i < args.length; i++) {
                if (key === args[i]) {
                    tailData[i] = val;
                    break;
                }
            }
            callback.apply(null, tailData);
        };
        for (var event of args) {
            this.emitter.on(event, tailCallback);
        }
    });
    for (var event of args) {
        this.emitter.on(event, _callback);
    }
};
// 多次调用同一接口,最后返回数组
EP.prototype.after = function(event, times, callback) {
    var _callback = after(times, data => {
        if (Array.isArray(data[event])) {
            // 解绑事件
            this.emitter.removeListener(event, _callback);
            callback.call(null, data[event].slice());
        }
    });
    this.emitter.on(event, _callback);
};

tail只在all的基础上做了一点点改动,在每次新数据到来时用新数据执行旧回调函数

after非常简单,只是对after工具函数的简单应用

4.异常处理

异常处理规则比较简单:一旦发生异常,就卸载所有处理函数并调用error事件监听器。如下:

// 异常处理
EP.prototype.fail = function(callback) {
    this.emitter.on('error', (err) => {
        // 卸载所有处理函数
        this.emitter.removeAllListeners();
        // 执行异常回调
        callback(err);
    });
};
EP.prototype.done = function(event) {
    return (err, result) => {
        if (err) {
            // 异常统一交由error事件处理
            return this.emitter.emit('error', err);
        }
        this.emitter.emit(event, result);
    };
};

done是之前taskep.emit(res, data)的翻版,加入了异常处理

done其实是EventProxy中比较精巧的部分,避免了if (err) {...}这样繁琐的异常处理操作,简化了业务代码,同时隐藏了异常处理,用起来更清爽也更安全

至此,模拟EventProxy结束,它提供的其它异步控制方式的实现与allafter类似,只是更复杂的控制方式可能需要更多的代码,如果继续实现下去的话,最终结果就是类似于async模块的异步控制方式大全,此处不再深究

三.EventProxy到底做了什么

EventProxy其实没做什么,它最大的特点就是侵入性小,像插件一样

展开之前使用模拟的all的例子,如下:

ep.all('res1', 'res2', 'res3', function(res1, res2, res3) {
    console.log(res1, res2, res3);
});
asyncTask('res1', 300, task('res1'));

// 展开asyncTask('res1', 300, task('res1'))
// asyncTask
setTimeout(function() {
    console.log('get ' + 'res1' + ' at ' + new Date().getTime());

    // task('res1')
    var data = 'this is ' + 'res1';
    ep.emit('res1', data);
}, 300);

可以看到,使用EP的过程是大片业务代码中偶尔穿插1句EP代码(ep.allep.emit),对业务块本身几乎没有影响(只是可能要在业务块出口插入一条EP代码),更不用重构业务代码去迎合框架,对比Promise的话,这一点很明显

回到问题,EventProxy到底做了什么?

EventProxy简化了用事件机制管理异步任务的过程。用内置的events模块也很容易实现类似的功能,而且代码量不会比EventProxy源码多很多。对业务代码的侵入性很小,因此更像是util,而不是大只的框架

四.总结

EventProxy是一个精巧的工具库(侵入性小),提供了基于事件机制的异步控制方法

对比async模块,EventProxy用起来更麻烦,功能也不够全面,但其小巧与灵活性是亮点,插件式的工具,可以随时选择用或者不用,而对于稍微“强势”一点的框架,弃用需要勇气

对比Promise,事件订阅/发布机制(EventProxy)的缺点是必须预先确定分支,否则事件发生后再指定分支就无效了。Promise最大的特点是:

  • 分离了正向用例和反向用例(p.then(onFulfilled, onRejected)

  • 延迟逻辑处理

延迟逻辑处理,即不用预先指定分支,先执行异步调用,延迟指定分支处理。但Promise的缺点是要为不同的场景封装不同的API,存在包装成本

P.S.关于Promise的详细信息,请查看完全理解Promise

参考资料

  • 《深入浅出NodeJS》

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*

code