Skip to content

Commit

Permalink
feat(queue): add pushAsync and unshiftAsync functions that reject on …
Browse files Browse the repository at this point in the history
…error. Closes #1659
  • Loading branch information
aearly committed Jun 23, 2019
1 parent e1045be commit 6739c08
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 29 deletions.
71 changes: 44 additions & 27 deletions lib/internal/queue.js
Expand Up @@ -45,27 +45,26 @@ export default function queue(worker, concurrency, payload) {
}

var processingScheduled = false;
function _insert(data, insertAtFront, callback) {
function _insert(data, insertAtFront, rejectOnError, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
q.started = true;
/*if (Array.isArray(data)) {

return data.map(datum => _insert(datum, insertAtFront, callback));
}*/

var res;
var res, rej;
function promiseCallback (err, ...args) {
// we don't care about the error, let the global error handler
// deal with it
if (err) return rejectOnError ? rej(err) : res()
if (args.length <= 1) return res(args[0])
res(args)
}

var item = {
data,
callback: callback || function (err, ...args) {
// we don't care about the error, let the global error handler
// deal with it
if (err) return
if (args.length <= 1) return res(args[0])
res(args)
}
callback: rejectOnError ?
promiseCallback :
(callback || promiseCallback)
};

if (insertAtFront) {
Expand All @@ -82,9 +81,10 @@ export default function queue(worker, concurrency, payload) {
});
}

if (!callback) {
return new Promise((resolve) => {
if (rejectOnError || !callback) {
return new Promise((resolve, reject) => {
res = resolve
rej = reject
})
}
}
Expand Down Expand Up @@ -121,6 +121,15 @@ export default function queue(worker, concurrency, payload) {
};
}

function _maybeDrain(data) {
if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
setImmediate(() => trigger('drain'));
return true
}
return false
}

const eventMethod = (name) => (handler) => {
if (!handler) {
return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -148,27 +157,35 @@ export default function queue(worker, concurrency, payload) {
paused: false,
push (data, callback) {
if (Array.isArray(data)) {
if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate(() => trigger('drain'));
}
return data.map(datum => _insert(datum, false, callback))
if (_maybeDrain(data)) return
return data.map(datum => _insert(datum, false, false, callback))
}
return _insert(data, false, callback);
return _insert(data, false, false, callback);
},
pushAsync (data, callback) {
if (Array.isArray(data)) {
if (_maybeDrain(data)) return
return data.map(datum => _insert(datum, false, true, callback))
}
return _insert(data, false, true, callback);
},
kill () {
off()
q._tasks.empty();
},
unshift (data, callback) {
if (Array.isArray(data)) {
if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate(() => trigger('drain'));
}
return data.map(datum => _insert(datum, true, callback))
if (_maybeDrain(data)) return
return data.map(datum => _insert(datum, true, false, callback))
}
return _insert(data, true, false, callback);
},
unshiftAsync (data, callback) {
if (Array.isArray(data)) {
if (_maybeDrain(data)) return
return data.map(datum => _insert(datum, true, true, callback))
}
return _insert(data, true, callback);
return _insert(data, true, true, callback);
},
remove (testFn) {
q._tasks.remove(testFn);
Expand Down
8 changes: 6 additions & 2 deletions lib/queue.js
Expand Up @@ -21,12 +21,16 @@ import wrapAsync from './internal/wrapAsync';
* @property {number} payload - an integer that specifies how many items are
* passed to the worker function at a time. only applies if this is a
* [cargo]{@link module:ControlFlow.cargo} object
* @property {Function} push - add a new task to the `queue`. Calls `callback`
* @property {AsyncFunction} push - add a new task to the `queue`. Calls `callback`
* once the `worker` has finished processing the task. Instead of a single task,
* a `tasks` array can be submitted. The respective callback is used for every
* task in the list. Invoke with `queue.push(task, [callback])`,
* @property {Function} unshift - add a new task to the front of the `queue`.
* @property {AsyncFunction} unshift - add a new task to the front of the `queue`.
* Invoke with `queue.unshift(task, [callback])`.
* @property {AsyncFunction} pushAsync - the same as `q.push`, except this returns
* a promise that rejects if an error occurs.
* @property {AsyncFunction} unshirtAsync - the same as `q.unshift`, except this returns
* a promise that rejects if an error occurs.
* @property {Function} remove - remove items from the queue that match a test
* function. The test function will be passed an object with a `data` property,
* and a `priority` property, if this is a
Expand Down
1 change: 1 addition & 0 deletions test/es2017/awaitableFunctions.js
Expand Up @@ -618,6 +618,7 @@ module.exports = function () {
'push cb 3',
'push cb 4',
'push cb 5',
'push cb undefined',
'push cb 7',
'unsaturated',
'push cb 8'
Expand Down
40 changes: 40 additions & 0 deletions test/queue.js
Expand Up @@ -156,6 +156,46 @@ describe('queue', function(){
});
});

it('pushAsync', done => {
const calls = []
var q = async.queue((task, cb) => {
if (task === 2) return cb(new Error('fail'))
cb()
})

q.pushAsync(1, () => { throw new Error('should not be called') }).then(() => calls.push(1))
q.pushAsync(2).catch(err => {
expect(err.message).to.equal('fail')
calls.push(2)
})
q.pushAsync([3, 4]).map(p => p.then(() => calls.push('arr')))
q.drain(() => setTimeout(() => {
console.log('drain')
expect(calls).to.eql([1, 2, 'arr', 'arr'])
done()
}))
})

it('unshiftAsync', done => {
const calls = []
var q = async.queue((task, cb) => {
if (task === 2) return cb(new Error('fail'))
cb()
})

q.unshiftAsync(1).then(() => calls.push(1))
q.unshiftAsync(2).catch(err => {
expect(err.message).to.equal('fail')
calls.push(2)
})
q.unshiftAsync([3, 4]).map(p => p.then(() => calls.push('arr')))
q.drain(() => setTimeout(() => {
console.log('drain')
expect(calls).to.eql(['arr', 'arr', 2, 1])
done()
}))
})

it('global error handler', (done) => {
var results = [];

Expand Down

0 comments on commit 6739c08

Please sign in to comment.