Skip to content

Commit

Permalink
feat: Canceling flows (#1542)
Browse files Browse the repository at this point in the history
* cancelable foreach

* cancelable waterfall

* cancellable auto

* fix lint

* fix tests

* cancelable whilst/until/during/forever

* fix waterfall test.  It WILL get there

* docs

* auto should not start other tasks once canceled

* simplify waterfall, add test for arrays

* simplify eachOf

* cancelable retry

* cancelable eachOf for arrays

* revert test tweak
  • Loading branch information
aearly committed Jul 2, 2018
1 parent 2a135a4 commit 53f6130
Show file tree
Hide file tree
Showing 21 changed files with 414 additions and 7 deletions.
34 changes: 34 additions & 0 deletions intro.md
Expand Up @@ -173,6 +173,40 @@ async.map([1, 2, 3], AsyncSquaringLibrary.square.bind(AsyncSquaringLibrary), fun
});
```

### Subtle Memory Leaks

There are cases where you might want to exit early from async flow, when calling an Async method inside another async function:

```javascript
function myFunction (args, outerCallback) {
async.waterfall([
//...
function (arg, next) {
if (someImportantCondition()) {
return outerCallback(null)
}
},
function (arg, next) {/*...*/}
], function done (err) {
//...
})
}
```

Something happened in a waterfall where you want to skip the rest of the execution, so you call an outer callack. However, Async will still wait for that inner `next` callback to be called, leaving some closure scope allocated.

As of version 3.0, you can call any Async callback with `false` as the `error` argument, and the rest of the execution of the Async method will be stopped or ignored.

```javascript
function (arg, next) {
if (someImportantCondition()) {
outerCallback(null)
return next(false) // ← signal that you called an outer callback
}
},
```


## Download

The source is available for download from
Expand Down
8 changes: 7 additions & 1 deletion lib/auto.js
Expand Up @@ -102,6 +102,7 @@ export default function (tasks, concurrency, callback) {

var results = {};
var runningTasks = 0;
var canceled = false;
var hasError = false;

var listeners = Object.create(null);
Expand Down Expand Up @@ -156,6 +157,7 @@ export default function (tasks, concurrency, callback) {
}

function processQueue() {
if (canceled) return
if (readyTasks.length === 0 && runningTasks === 0) {
return callback(null, results);
}
Expand Down Expand Up @@ -189,6 +191,10 @@ export default function (tasks, concurrency, callback) {

var taskCallback = onlyOnce(function(err, result) {
runningTasks--;
if (err === false) {
canceled = true
return
}
if (arguments.length > 2) {
result = slice(arguments, 1);
}
Expand All @@ -200,7 +206,7 @@ export default function (tasks, concurrency, callback) {
safeResults[key] = result;
hasError = true;
listeners = Object.create(null);

if (canceled) return
callback(err, safeResults);
} else {
results[key] = result;
Expand Down
2 changes: 2 additions & 0 deletions lib/doDuring.js
Expand Up @@ -30,13 +30,15 @@ export default function doDuring(fn, test, callback) {

function next(err/*, ...args*/) {
if (err) return callback(err);
if (err === false) return;
var args = slice(arguments, 1);
args.push(check);
_test.apply(this, args);
};

function check(err, truth) {
if (err) return callback(err);
if (err === false) return;
if (!truth) return callback(null);
_fn(next);
}
Expand Down
1 change: 1 addition & 0 deletions lib/doWhilst.js
Expand Up @@ -31,6 +31,7 @@ export default function doWhilst(iteratee, test, callback) {
var _iteratee = wrapAsync(iteratee);
var next = function(err/*, ...args*/) {
if (err) return callback(err);
if (err === false) return;
var args = slice(arguments, 1);
if (test.apply(this, args)) return _iteratee(next);
callback.apply(null, [null].concat(args));
Expand Down
2 changes: 2 additions & 0 deletions lib/during.js
Expand Up @@ -45,11 +45,13 @@ export default function during(test, fn, callback) {

function next(err) {
if (err) return callback(err);
if (err === false) return;
_test(check);
}

function check(err, truth) {
if (err) return callback(err);
if (err === false) return;
if (!truth) return callback(null);
_fn(next);
}
Expand Down
7 changes: 6 additions & 1 deletion lib/eachOf.js
Expand Up @@ -12,12 +12,17 @@ function eachOfArrayLike(coll, iteratee, callback) {
callback = once(callback || noop);
var index = 0,
completed = 0,
length = coll.length;
length = coll.length,
canceled = false;
if (length === 0) {
callback(null);
}

function iteratorCallback(err, value) {
if (err === false) {
canceled = true
}
if (canceled === true) return
if (err) {
callback(err);
} else if ((++completed === length) || value === breakLoop) {
Expand Down
1 change: 1 addition & 0 deletions lib/forever.js
Expand Up @@ -38,6 +38,7 @@ export default function forever(fn, errback) {

function next(err) {
if (err) return done(err);
if (err === false) return;
task(next);
}
next();
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/eachOfLimit.js
Expand Up @@ -14,15 +14,21 @@ export default function _eachOfLimit(limit) {
}
var nextElem = iterator(obj);
var done = false;
var canceled = false;
var running = 0;
var looping = false;

function iterateeCallback(err, value) {
if (canceled) return
running -= 1;
if (err) {
done = true;
callback(err);
}
else if (err === false) {
done = true;
canceled = true;
}
else if (value === breakLoop || (done && running <= 0)) {
done = true;
return callback(null);
Expand Down
1 change: 1 addition & 0 deletions lib/retry.js
Expand Up @@ -133,6 +133,7 @@ export default function retry(opts, task, callback) {
var attempt = 1;
function retryAttempt() {
_task(function(err) {
if (err === false) return
if (err && attempt++ < options.times &&
(typeof options.errorFilter != 'function' ||
options.errorFilter(err))) {
Expand Down
2 changes: 1 addition & 1 deletion lib/tryEach.js
Expand Up @@ -52,7 +52,7 @@ export default function tryEach(tasks, callback) {
result = res;
}
error = err;
callback(!err);
callback(err ? null : {});
});
}, function () {
callback(error, result);
Expand Down
1 change: 1 addition & 0 deletions lib/waterfall.js
Expand Up @@ -75,6 +75,7 @@ export default function(tasks, callback) {
}

function next(err/*, ...args*/) {
if (err === false) return
if (err || taskIndex === tasks.length) {
return callback.apply(null, arguments);
}
Expand Down
1 change: 1 addition & 0 deletions lib/whilst.js
Expand Up @@ -44,6 +44,7 @@ export default function whilst(test, iteratee, callback) {
if (!test()) return callback(null);
var next = function(err/*, ...args*/) {
if (err) return callback(err);
if (err === false) return;
if (test()) return _iteratee(next);
var args = slice(arguments, 1);
callback.apply(null, [null].concat(args));
Expand Down
59 changes: 58 additions & 1 deletion test/auto.js
Expand Up @@ -168,6 +168,63 @@ describe('auto', function () {
setTimeout(done, 100);
});

it('auto canceled', function(done){
const call_order = []
async.auto({
task1: function(callback){
call_order.push(1)
callback(false);
},
task2: ['task1', function(/*results, callback*/){
call_order.push(2)
throw new Error('task2 should not be called');
}],
task3: function(callback){
call_order.push(3)
callback('testerror2');
}
},
function(){
throw new Error('should not get here')
});
setTimeout(() => {
expect(call_order).to.eql([1, 3])
done()
}, 10);
});

it('does not start other tasks when it has been canceled', function(done) {
const call_order = []
debugger
async.auto({
task1: function(callback) {
call_order.push(1);
// defer calling task2, so task3 has time to stop execution
async.setImmediate(callback);
},
task2: ['task1', function( /*results, callback*/ ) {
call_order.push(2);
throw new Error('task2 should not be called');
}],
task3: function(callback) {
call_order.push(3);
callback(false);
},
task4: ['task3', function( /*results, callback*/ ) {
call_order.push(4);
throw new Error('task4 should not be called');
}]
},
function() {
throw new Error('should not get here')
});

setTimeout(() => {
expect(call_order).to.eql([1, 3])
done()
}, 25)
});

it('auto no callback', function(done){
async.auto({
task1: function(callback){callback();},
Expand All @@ -185,7 +242,7 @@ describe('auto', function () {
it('auto error should pass partial results', function(done) {
async.auto({
task1: function(callback){
callback(false, 'result1');
callback(null, 'result1');
},
task2: ['task1', function(results, callback){
callback('testerror', 'result2');
Expand Down
48 changes: 48 additions & 0 deletions test/during.js
Expand Up @@ -34,6 +34,22 @@ describe('during', function() {
);
});

it('during canceling', (done) => {
let counter = 0;
async.during(
cb => cb(null, true),
cb => {
counter++
cb(counter === 2 ? false : null);
},
() => { throw new Error('should not get here')}
);
setTimeout(() => {
expect(counter).to.equal(2);
done();
}, 10)
})

it('doDuring', function(done) {
var call_order = [];

Expand Down Expand Up @@ -95,4 +111,36 @@ describe('during', function() {
}
);
});

it('doDuring canceling', (done) => {
let counter = 0;
async.doDuring(
cb => {
counter++
cb(counter === 2 ? false : null);
},
cb => cb(null, true),
() => { throw new Error('should not get here')}
);
setTimeout(() => {
expect(counter).to.equal(2);
done();
}, 10)
})

it('doDuring canceling in test', (done) => {
let counter = 0;
async.doDuring(
cb => {
counter++
cb(null, counter);
},
(n, cb) => cb(n === 2 ? false : null, true),
() => { throw new Error('should not get here')}
);
setTimeout(() => {
expect(counter).to.equal(2);
done();
}, 10)
})
});

0 comments on commit 53f6130

Please sign in to comment.