From 53f613039af68353371c2953446fa8084b3fc86b Mon Sep 17 00:00:00 2001 From: Alex Early Date: Sun, 1 Jul 2018 17:12:06 -0700 Subject: [PATCH] feat: Canceling flows (#1542) * cancelable foreach * cancelable waterfall * cancellable auto * fix lint * fix tests * cancelable whilst/until/during/forever * fix waterfall test. It WILL get there * docs * auto should not start other tasks once canceled * simplify waterfall, add test for arrays * simplify eachOf * cancelable retry * cancelable eachOf for arrays * revert test tweak --- intro.md | 34 ++++++++++++ lib/auto.js | 8 ++- lib/doDuring.js | 2 + lib/doWhilst.js | 1 + lib/during.js | 2 + lib/eachOf.js | 7 ++- lib/forever.js | 1 + lib/internal/eachOfLimit.js | 6 ++ lib/retry.js | 1 + lib/tryEach.js | 2 +- lib/waterfall.js | 1 + lib/whilst.js | 1 + test/auto.js | 59 +++++++++++++++++++- test/during.js | 48 ++++++++++++++++ test/eachOf.js | 107 ++++++++++++++++++++++++++++++++++++ test/forever.js | 13 +++++ test/parallel.js | 26 +++++++++ test/retry.js | 12 ++++ test/until.js | 32 +++++++++++ test/waterfall.js | 26 ++++++++- test/whilst.js | 32 +++++++++++ 21 files changed, 414 insertions(+), 7 deletions(-) diff --git a/intro.md b/intro.md index 43d3c819d..2470954d8 100644 --- a/intro.md +++ b/intro.md @@ -173,6 +173,40 @@ async.map([1, 2, 3], AsyncSquaringLibrary.square.bind(AsyncSquaringLibrary), fun }); ``` +### Subtle Memory Leaks + +There are cases where you might want to exit early from async flow, when calling an Async method inside another async function: + +```javascript +function myFunction (args, outerCallback) { + async.waterfall([ + //... + function (arg, next) { + if (someImportantCondition()) { + return outerCallback(null) + } + }, + function (arg, next) {/*...*/} + ], function done (err) { + //... + }) +} +``` + +Something happened in a waterfall where you want to skip the rest of the execution, so you call an outer callack. However, Async will still wait for that inner `next` callback to be called, leaving some closure scope allocated. + +As of version 3.0, you can call any Async callback with `false` as the `error` argument, and the rest of the execution of the Async method will be stopped or ignored. + +```javascript + function (arg, next) { + if (someImportantCondition()) { + outerCallback(null) + return next(false) // ← signal that you called an outer callback + } + }, +``` + + ## Download The source is available for download from diff --git a/lib/auto.js b/lib/auto.js index a6150aae6..500620a69 100644 --- a/lib/auto.js +++ b/lib/auto.js @@ -102,6 +102,7 @@ export default function (tasks, concurrency, callback) { var results = {}; var runningTasks = 0; + var canceled = false; var hasError = false; var listeners = Object.create(null); @@ -156,6 +157,7 @@ export default function (tasks, concurrency, callback) { } function processQueue() { + if (canceled) return if (readyTasks.length === 0 && runningTasks === 0) { return callback(null, results); } @@ -189,6 +191,10 @@ export default function (tasks, concurrency, callback) { var taskCallback = onlyOnce(function(err, result) { runningTasks--; + if (err === false) { + canceled = true + return + } if (arguments.length > 2) { result = slice(arguments, 1); } @@ -200,7 +206,7 @@ export default function (tasks, concurrency, callback) { safeResults[key] = result; hasError = true; listeners = Object.create(null); - + if (canceled) return callback(err, safeResults); } else { results[key] = result; diff --git a/lib/doDuring.js b/lib/doDuring.js index ac2a04c7e..5eb2b9279 100644 --- a/lib/doDuring.js +++ b/lib/doDuring.js @@ -30,6 +30,7 @@ export default function doDuring(fn, test, callback) { function next(err/*, ...args*/) { if (err) return callback(err); + if (err === false) return; var args = slice(arguments, 1); args.push(check); _test.apply(this, args); @@ -37,6 +38,7 @@ export default function doDuring(fn, test, callback) { function check(err, truth) { if (err) return callback(err); + if (err === false) return; if (!truth) return callback(null); _fn(next); } diff --git a/lib/doWhilst.js b/lib/doWhilst.js index 3c2b865a7..524582081 100644 --- a/lib/doWhilst.js +++ b/lib/doWhilst.js @@ -31,6 +31,7 @@ export default function doWhilst(iteratee, test, callback) { var _iteratee = wrapAsync(iteratee); var next = function(err/*, ...args*/) { if (err) return callback(err); + if (err === false) return; var args = slice(arguments, 1); if (test.apply(this, args)) return _iteratee(next); callback.apply(null, [null].concat(args)); diff --git a/lib/during.js b/lib/during.js index cda9979ea..02da045a2 100644 --- a/lib/during.js +++ b/lib/during.js @@ -45,11 +45,13 @@ export default function during(test, fn, callback) { function next(err) { if (err) return callback(err); + if (err === false) return; _test(check); } function check(err, truth) { if (err) return callback(err); + if (err === false) return; if (!truth) return callback(null); _fn(next); } diff --git a/lib/eachOf.js b/lib/eachOf.js index bb9edfac7..5dbeb81c2 100644 --- a/lib/eachOf.js +++ b/lib/eachOf.js @@ -12,12 +12,17 @@ function eachOfArrayLike(coll, iteratee, callback) { callback = once(callback || noop); var index = 0, completed = 0, - length = coll.length; + length = coll.length, + canceled = false; if (length === 0) { callback(null); } function iteratorCallback(err, value) { + if (err === false) { + canceled = true + } + if (canceled === true) return if (err) { callback(err); } else if ((++completed === length) || value === breakLoop) { diff --git a/lib/forever.js b/lib/forever.js index 37532513f..fb69adf0c 100644 --- a/lib/forever.js +++ b/lib/forever.js @@ -38,6 +38,7 @@ export default function forever(fn, errback) { function next(err) { if (err) return done(err); + if (err === false) return; task(next); } next(); diff --git a/lib/internal/eachOfLimit.js b/lib/internal/eachOfLimit.js index 6ce076252..ae153b1a2 100644 --- a/lib/internal/eachOfLimit.js +++ b/lib/internal/eachOfLimit.js @@ -14,15 +14,21 @@ export default function _eachOfLimit(limit) { } var nextElem = iterator(obj); var done = false; + var canceled = false; var running = 0; var looping = false; function iterateeCallback(err, value) { + if (canceled) return running -= 1; if (err) { done = true; callback(err); } + else if (err === false) { + done = true; + canceled = true; + } else if (value === breakLoop || (done && running <= 0)) { done = true; return callback(null); diff --git a/lib/retry.js b/lib/retry.js index 390fd55cc..ef0440667 100644 --- a/lib/retry.js +++ b/lib/retry.js @@ -133,6 +133,7 @@ export default function retry(opts, task, callback) { var attempt = 1; function retryAttempt() { _task(function(err) { + if (err === false) return if (err && attempt++ < options.times && (typeof options.errorFilter != 'function' || options.errorFilter(err))) { diff --git a/lib/tryEach.js b/lib/tryEach.js index 87fba1266..82649b4bd 100644 --- a/lib/tryEach.js +++ b/lib/tryEach.js @@ -52,7 +52,7 @@ export default function tryEach(tasks, callback) { result = res; } error = err; - callback(!err); + callback(err ? null : {}); }); }, function () { callback(error, result); diff --git a/lib/waterfall.js b/lib/waterfall.js index 3e71d2cec..ceaf59d87 100644 --- a/lib/waterfall.js +++ b/lib/waterfall.js @@ -75,6 +75,7 @@ export default function(tasks, callback) { } function next(err/*, ...args*/) { + if (err === false) return if (err || taskIndex === tasks.length) { return callback.apply(null, arguments); } diff --git a/lib/whilst.js b/lib/whilst.js index da748fada..32c3b52e5 100644 --- a/lib/whilst.js +++ b/lib/whilst.js @@ -44,6 +44,7 @@ export default function whilst(test, iteratee, callback) { if (!test()) return callback(null); var next = function(err/*, ...args*/) { if (err) return callback(err); + if (err === false) return; if (test()) return _iteratee(next); var args = slice(arguments, 1); callback.apply(null, [null].concat(args)); diff --git a/test/auto.js b/test/auto.js index 489dbea0c..3b561cbef 100644 --- a/test/auto.js +++ b/test/auto.js @@ -168,6 +168,63 @@ describe('auto', function () { setTimeout(done, 100); }); + it('auto canceled', function(done){ + const call_order = [] + async.auto({ + task1: function(callback){ + call_order.push(1) + callback(false); + }, + task2: ['task1', function(/*results, callback*/){ + call_order.push(2) + throw new Error('task2 should not be called'); + }], + task3: function(callback){ + call_order.push(3) + callback('testerror2'); + } + }, + function(){ + throw new Error('should not get here') + }); + setTimeout(() => { + expect(call_order).to.eql([1, 3]) + done() + }, 10); + }); + + it('does not start other tasks when it has been canceled', function(done) { + const call_order = [] + debugger + async.auto({ + task1: function(callback) { + call_order.push(1); + // defer calling task2, so task3 has time to stop execution + async.setImmediate(callback); + }, + task2: ['task1', function( /*results, callback*/ ) { + call_order.push(2); + throw new Error('task2 should not be called'); + }], + task3: function(callback) { + call_order.push(3); + callback(false); + }, + task4: ['task3', function( /*results, callback*/ ) { + call_order.push(4); + throw new Error('task4 should not be called'); + }] + }, + function() { + throw new Error('should not get here') + }); + + setTimeout(() => { + expect(call_order).to.eql([1, 3]) + done() + }, 25) + }); + it('auto no callback', function(done){ async.auto({ task1: function(callback){callback();}, @@ -185,7 +242,7 @@ describe('auto', function () { it('auto error should pass partial results', function(done) { async.auto({ task1: function(callback){ - callback(false, 'result1'); + callback(null, 'result1'); }, task2: ['task1', function(results, callback){ callback('testerror', 'result2'); diff --git a/test/during.js b/test/during.js index 13db8b327..e0636e501 100644 --- a/test/during.js +++ b/test/during.js @@ -34,6 +34,22 @@ describe('during', function() { ); }); + it('during canceling', (done) => { + let counter = 0; + async.during( + cb => cb(null, true), + cb => { + counter++ + cb(counter === 2 ? false : null); + }, + () => { throw new Error('should not get here')} + ); + setTimeout(() => { + expect(counter).to.equal(2); + done(); + }, 10) + }) + it('doDuring', function(done) { var call_order = []; @@ -95,4 +111,36 @@ describe('during', function() { } ); }); + + it('doDuring canceling', (done) => { + let counter = 0; + async.doDuring( + cb => { + counter++ + cb(counter === 2 ? false : null); + }, + cb => cb(null, true), + () => { throw new Error('should not get here')} + ); + setTimeout(() => { + expect(counter).to.equal(2); + done(); + }, 10) + }) + + it('doDuring canceling in test', (done) => { + let counter = 0; + async.doDuring( + cb => { + counter++ + cb(null, counter); + }, + (n, cb) => cb(n === 2 ? false : null, true), + () => { throw new Error('should not get here')} + ); + setTimeout(() => { + expect(counter).to.equal(2); + done(); + }, 10) + }) }); diff --git a/test/eachOf.js b/test/eachOf.js index dd73c22fd..7d80990c4 100644 --- a/test/eachOf.js +++ b/test/eachOf.js @@ -403,4 +403,111 @@ describe("eachOf", function() { done(); }); }); + + it('forEachOfLimit canceled', function(done) { + var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 }; + var call_order = []; + + async.forEachOfLimit(obj, 3, function(value, key, callback){ + call_order.push(value, key); + if (value === 2) { + return callback(false); + } + callback() + }, function(){ + throw new Error('should not get here') + }); + setTimeout(() => { + expect(call_order).to.eql([ 1, "a", 2, "b" ]); + done() + }, 10); + }); + + it('forEachOfLimit canceled (async)', function(done) { + var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 }; + var call_order = []; + + async.forEachOfLimit(obj, 3, function(value, key, callback){ + call_order.push(value, key); + setTimeout(() => { + if (value === 2) { + return callback(false); + } + callback() + }) + }, function(){ + throw new Error('should not get here') + }); + setTimeout(() => { + expect(call_order).to.eql([ 1, "a", 2, "b", 3, "c", 4, "d" ]); + done() + }, 20); + }); + + it('eachOfLimit canceled (async, array)', function(done) { + var obj = ['a', 'b', 'c', 'd', 'e']; + var call_order = []; + + async.eachOfLimit(obj, 3, function(value, key, callback){ + call_order.push(key, value); + setTimeout(() => { + if (value === 'b') { + return callback(false); + } + callback() + }) + }, function(){ + throw new Error('should not get here') + }); + setTimeout(() => { + expect(call_order).to.eql([ 0, "a", 1, "b", 2, "c", 3, "d" ]); + done() + }, 20); + }); + + it('eachOf canceled (async, array)', function(done) { + var arr = ['a', 'b', 'c', 'd', 'e']; + var call_order = []; + + async.eachOf(arr, function(value, key, callback){ + call_order.push(key, value); + setTimeout(() => { + if (value === 'b') { + return callback(false); + } + callback() + }) + }, function(){ + throw new Error('should not get here') + }); + setTimeout(() => { + expect(call_order).to.eql([ 0, "a", 1, "b", 2, "c", 3, "d", 4, "e" ]); + done() + }, 20); + }); + + it('forEachOfLimit canceled (async, w/ error)', function(done) { + var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 }; + var call_order = []; + + async.forEachOfLimit(obj, 3, function(value, key, callback){ + call_order.push(value, key); + setTimeout(() => { + if (value === 2) { + return callback(false); + } + if (value === 3) { + return callback('fail'); + } + callback() + }) + }, function(){ + throw new Error('should not get here') + }); + setTimeout(() => { + expect(call_order).to.eql([ 1, "a", 2, "b", 3, "c", 4, "d" ]); + done() + }, 20); + }); + }); diff --git a/test/forever.js b/test/forever.js index e00a22d08..b69839959 100644 --- a/test/forever.js +++ b/test/forever.js @@ -38,5 +38,18 @@ describe('forever', function(){ done(); }); }); + + it('should cancel', (done) => { + var counter = 0; + async.forever(cb => { + counter++ + cb(counter === 2 ? false : null) + }, () => { throw new Error('should not get here') }) + + setTimeout(() => { + expect(counter).to.eql(2) + done() + }, 10) + }) }); }); diff --git a/test/parallel.js b/test/parallel.js index 2b07822c6..c7de8881b 100644 --- a/test/parallel.js +++ b/test/parallel.js @@ -191,6 +191,32 @@ describe('parallel', function() { }); }); + it('parallel limit canceled', function(done) { + const call_order = [] + async.parallelLimit([ + function(callback){ + call_order.push(1) + callback(); + }, + function(callback){ + call_order.push(2) + callback(false); + }, + function(callback){ + call_order.push(3) + callback('error', 2); + } + ], + 1, + function(){ + throw new Error('should not get here') + }); + setTimeout(() => { + expect(call_order).to.eql([1, 2]); + done() + }, 25); + }); + it('parallel call in another context @nycinvalid @nodeonly', function(done) { var vm = require('vm'); var sandbox = { diff --git a/test/retry.js b/test/retry.js index 8a9f4dacf..d5e0d4052 100644 --- a/test/retry.js +++ b/test/retry.js @@ -122,6 +122,18 @@ describe("retry", function () { }, 50); }); + it("should be cancelable", function (done) { + var calls = 0; + async.retry(2, function(cb) { + calls++; + cb(calls > 1 ? false : 'fail'); + }, () => { throw new Error('should not get here') }); + setTimeout(function () { + expect(calls).to.equal(2); + done(); + }, 10); + }); + it('retry does not precompute the intervals (#1226)', function(done) { var callTimes = []; function intervalFunc() { diff --git a/test/until.js b/test/until.js index e98b18403..2b82fe165 100644 --- a/test/until.js +++ b/test/until.js @@ -34,6 +34,22 @@ describe('until', function(){ ); }); + it('until canceling', (done) => { + let counter = 0; + async.until( + () => false, + cb => { + counter++ + cb(counter === 2 ? false: null); + }, + () => { throw new Error('should not get here')} + ); + setTimeout(() => { + expect(counter).to.equal(2); + done(); + }, 10) + }) + it('doUntil', function(done) { var call_order = []; var count = 0; @@ -92,4 +108,20 @@ describe('until', function(){ } ); }); + + it('doUntil canceling', (done) => { + let counter = 0; + async.doUntil( + cb => { + counter++ + cb(counter === 2 ? false: null); + }, + () => false, + () => { throw new Error('should not get here')} + ); + setTimeout(() => { + expect(counter).to.equal(2); + done(); + }, 10) + }) }); diff --git a/test/waterfall.js b/test/waterfall.js index 78b11bbb6..1501f0332 100644 --- a/test/waterfall.js +++ b/test/waterfall.js @@ -90,6 +90,28 @@ describe("waterfall", function () { }); }); + + it('canceled', function(done){ + const call_order = [] + async.waterfall([ + function(callback){ + call_order.push(1) + callback(false); + }, + function(callback){ + call_order.push(2) + assert(false, 'next function should not be called'); + callback(); + } + ], function(){ + throw new Error('should not get here') + }); + setTimeout(() => { + expect(call_order).to.eql([1]) + done() + }, 10) + }); + it('multiple callback calls', function(){ var arr = [ function(callback){ @@ -131,9 +153,7 @@ describe("waterfall", function () { function(arg1, arg2, callback){ setTimeout(callback, 15, null, arg1, arg2, 'three'); } - ], function () { - throw new Error('should not get here') - }); + ]); }); it('call in another context @nycinvalid @nodeonly', function(done) { diff --git a/test/whilst.js b/test/whilst.js index e04c2b72f..2ce2f23ff 100644 --- a/test/whilst.js +++ b/test/whilst.js @@ -48,6 +48,22 @@ describe('whilst', function(){ done(); }); + it('whilst canceling', function(done) { + var counter = 0; + async.whilst( + function () { return counter < 3; }, + function (cb) { + counter++; + cb(counter === 2 ? false : null); + }, + () => { throw new Error('should not get here')} + ); + setTimeout(() => { + expect(counter).to.equal(2); + done(); + }, 10) + }); + it('doWhilst', function(done) { var call_order = []; @@ -122,4 +138,20 @@ describe('whilst', function(){ } ); }); + + it('doWhilst canceling', (done) => { + let counter = 0; + async.doWhilst( + cb => { + counter++ + cb(counter === 2 ? false : null); + }, + () => true, + () => { throw new Error('should not get here')} + ); + setTimeout(() => { + expect(counter).to.equal(2); + done(); + }, 10) + }) });