前言
异步是javascript一个标志,尤其在nodejs中,操作数据库等往往带来大量的异步调用,所谓的“callback hell”由此而来,async作为成名较早的异步流程库,API设计属于异步1.0时代,其思想非常接近主流思考方向,源码必然很有参考价值,于是找时间看了一下其源码中的几个主要方法。
async.parallel
async.parallel = function (tasks, callback) {
_parallel(async.eachOf, tasks, callback);
};
//这一步很简单,调用了 _parallel方法:
function _parallel(eachfn, tasks, callback) {
//eachfn是async.eachOf
//tasks是异步队列,callback就是平行异步的总回调
callback = callback || noop;
var results = _isArrayLike(tasks) ? [] : {};
//results保存着每个异步调用的返回结果
//这一步循环调用tasks,我们看看eachfn是怎么定义的
eachfn(tasks, function (task, key, callback) {
。。。
}, function (err) {
callback(err, results);
}
async.eachOf = function (object, iterator, callback) {
//经过once包装过的callback保证只执行一次,
//考虑到同时执行的异步有出错的时候,callback不会多次执行
callback = _once(callback || noop);
object = object || [];
var size = _isArrayLike(object) ? object.length : _ke
var completed = 0;
if (!size) {
return callback(null);
}
_each(object, function (value, key) {
iterator(object[key], key, only_once(done));
});
function done(err) {
if (err) {
callback(err);
}
else {
completed += 1;
if (completed >= size) {
callback(null);
}
}
}
};
//eachof逻辑非常简单,其实就是传递一个数组或者对象给它,然后它会
//遍历数组,执行传入的 iterator 处理方法,每执行一次+1,
//直到全部执行完,调用最终的callback。它用来控制什么时候结束所有异步调用。
//我们再回过头来看 iterator 即异步队列循环调用的时候给每个方法封装的包裹函数。
eachfn(tasks, function (task, key, callback) {
//eachfn上面我们分析了,它就是用来控制结束的,所以 iterator
//也就是这个匿名函数必须调用它的done方法,它来做迭代器的控制判断
//什么时候结束,这个done方法在这个匿名函数中就是 callback 参数。
//什么时候调用callback是业务代码控制的,所以必须传给task
//同时它再在这个done方法上面通过_restParam封装了一个方法,
//用于保存结果到results
//_restParam的方法是把业务代码的数据除了err以为都存到一个数组里面。
task(_restParam(function (err, args) {
if (args.length <= 1)="" {="" args="args[0];" }="" results[key]="args;" callback(err);="" }));="" },="" function="" (err)="" callback(err,="" results);="" });="" <="" pre="">
async.waterfall
async.waterfall = function (tasks, callback) {
callback = _once(callback || noop);
if (!_isArray(tasks)) {
var err = new Error('First argument to waterfall must be an array
return callback(err);
}
if (!tasks.length) {
return callback();
}
function wrapIterator(iterator) {
...
}
wrapIterator(async.iterator(tasks))();
//我们先看 async.iterator,这方法和 parallel 里面的eachof
//有点神似,目的是把 tasks 包装成链状的,有一个next方法,指向下一个task。
async.iterator = function (tasks) {
function makeCallback(index) {
function fn() {
if (tasks.length) {
tasks[index].apply(null, arguments);
}
return fn.next();
}
fn.next = function () {
return (index < tasks.length - 1) ? makeCallback(index + 1): null;
};
return fn;
}
return makeCallback(0);
};
};
//再回过头来看 wrapIterator
function wrapIterator(iterator) {
//_restParam照样可以不用管,就是把一个方法的实参封装成err和一个数组
return _restParam(function (err, args) {
if (err) {
//错误处理,出错直接结束,因为没有调用next了
callback.apply(null, [err].concat(args));
}
else {
var next = iterator.next();
if (next) {
// 把下一个task作为最后一个形参传给当前的task
// 业务代码结束调用 callback(err, data); 时:
// callback就是下一个task。
args.push(wrapIterator(next));
}
else {
args.push(callback);
}
// 这个task的形参是由上一个task在业务代码中调用而来,最后一个固定是callback,前面为数据
ensureAsync(iterator).apply(null, args);
}
});
}
=>