diff --git a/lib/internal/queue.js b/lib/internal/queue.js index 01567aa5c..bb3e16b7c 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -45,27 +45,26 @@ export default function queue(worker, concurrency, payload) { } var processingScheduled = false; - function _insert(data, insertAtFront, callback) { + function _insert(data, insertAtFront, rejectOnError, callback) { if (callback != null && typeof callback !== 'function') { throw new Error('task callback must be a function'); } q.started = true; - /*if (Array.isArray(data)) { - return data.map(datum => _insert(datum, insertAtFront, callback)); - }*/ - - var res; + var res, rej; + function promiseCallback (err, ...args) { + // we don't care about the error, let the global error handler + // deal with it + if (err) return rejectOnError ? rej(err) : res() + if (args.length <= 1) return res(args[0]) + res(args) + } var item = { data, - callback: callback || function (err, ...args) { - // we don't care about the error, let the global error handler - // deal with it - if (err) return - if (args.length <= 1) return res(args[0]) - res(args) - } + callback: rejectOnError ? + promiseCallback : + (callback || promiseCallback) }; if (insertAtFront) { @@ -82,9 +81,10 @@ export default function queue(worker, concurrency, payload) { }); } - if (!callback) { - return new Promise((resolve) => { + if (rejectOnError || !callback) { + return new Promise((resolve, reject) => { res = resolve + rej = reject }) } } @@ -121,6 +121,15 @@ export default function queue(worker, concurrency, payload) { }; } + function _maybeDrain(data) { + if (data.length === 0 && q.idle()) { + // call drain immediately if there are no tasks + setImmediate(() => trigger('drain')); + return true + } + return false + } + const eventMethod = (name) => (handler) => { if (!handler) { return new Promise((resolve, reject) => { @@ -148,13 +157,17 @@ export default function queue(worker, concurrency, payload) { paused: false, push (data, callback) { if (Array.isArray(data)) { - if (data.length === 0 && q.idle()) { - // call drain immediately if there are no tasks - return setImmediate(() => trigger('drain')); - } - return data.map(datum => _insert(datum, false, callback)) + if (_maybeDrain(data)) return + return data.map(datum => _insert(datum, false, false, callback)) } - return _insert(data, false, callback); + return _insert(data, false, false, callback); + }, + pushAsync (data, callback) { + if (Array.isArray(data)) { + if (_maybeDrain(data)) return + return data.map(datum => _insert(datum, false, true, callback)) + } + return _insert(data, false, true, callback); }, kill () { off() @@ -162,13 +175,17 @@ export default function queue(worker, concurrency, payload) { }, unshift (data, callback) { if (Array.isArray(data)) { - if (data.length === 0 && q.idle()) { - // call drain immediately if there are no tasks - return setImmediate(() => trigger('drain')); - } - return data.map(datum => _insert(datum, true, callback)) + if (_maybeDrain(data)) return + return data.map(datum => _insert(datum, true, false, callback)) + } + return _insert(data, true, false, callback); + }, + unshiftAsync (data, callback) { + if (Array.isArray(data)) { + if (_maybeDrain(data)) return + return data.map(datum => _insert(datum, true, true, callback)) } - return _insert(data, true, callback); + return _insert(data, true, true, callback); }, remove (testFn) { q._tasks.remove(testFn); diff --git a/lib/queue.js b/lib/queue.js index 53cf4ad62..dc5554735 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -21,12 +21,16 @@ import wrapAsync from './internal/wrapAsync'; * @property {number} payload - an integer that specifies how many items are * passed to the worker function at a time. only applies if this is a * [cargo]{@link module:ControlFlow.cargo} object - * @property {Function} push - add a new task to the `queue`. Calls `callback` + * @property {AsyncFunction} push - add a new task to the `queue`. Calls `callback` * once the `worker` has finished processing the task. Instead of a single task, * a `tasks` array can be submitted. The respective callback is used for every * task in the list. Invoke with `queue.push(task, [callback])`, - * @property {Function} unshift - add a new task to the front of the `queue`. + * @property {AsyncFunction} unshift - add a new task to the front of the `queue`. * Invoke with `queue.unshift(task, [callback])`. + * @property {AsyncFunction} pushAsync - the same as `q.push`, except this returns + * a promise that rejects if an error occurs. + * @property {AsyncFunction} unshirtAsync - the same as `q.unshift`, except this returns + * a promise that rejects if an error occurs. * @property {Function} remove - remove items from the queue that match a test * function. The test function will be passed an object with a `data` property, * and a `priority` property, if this is a diff --git a/test/es2017/awaitableFunctions.js b/test/es2017/awaitableFunctions.js index e8adf7b70..eb2663cff 100644 --- a/test/es2017/awaitableFunctions.js +++ b/test/es2017/awaitableFunctions.js @@ -618,6 +618,7 @@ module.exports = function () { 'push cb 3', 'push cb 4', 'push cb 5', + 'push cb undefined', 'push cb 7', 'unsaturated', 'push cb 8' diff --git a/test/queue.js b/test/queue.js index e9f3a787d..f99e91af4 100644 --- a/test/queue.js +++ b/test/queue.js @@ -156,6 +156,46 @@ describe('queue', function(){ }); }); + it('pushAsync', done => { + const calls = [] + var q = async.queue((task, cb) => { + if (task === 2) return cb(new Error('fail')) + cb() + }) + + q.pushAsync(1, () => { throw new Error('should not be called') }).then(() => calls.push(1)) + q.pushAsync(2).catch(err => { + expect(err.message).to.equal('fail') + calls.push(2) + }) + q.pushAsync([3, 4]).map(p => p.then(() => calls.push('arr'))) + q.drain(() => setTimeout(() => { + console.log('drain') + expect(calls).to.eql([1, 2, 'arr', 'arr']) + done() + })) + }) + + it('unshiftAsync', done => { + const calls = [] + var q = async.queue((task, cb) => { + if (task === 2) return cb(new Error('fail')) + cb() + }) + + q.unshiftAsync(1).then(() => calls.push(1)) + q.unshiftAsync(2).catch(err => { + expect(err.message).to.equal('fail') + calls.push(2) + }) + q.unshiftAsync([3, 4]).map(p => p.then(() => calls.push('arr'))) + q.drain(() => setTimeout(() => { + console.log('drain') + expect(calls).to.eql(['arr', 'arr', 2, 1]) + done() + })) + }) + it('global error handler', (done) => { var results = [];