diff --git a/lib/cargo.js b/lib/cargo.js index 5060223fa..1afdc0ea4 100644 --- a/lib/cargo.js +++ b/lib/cargo.js @@ -1,35 +1,5 @@ import queue from './internal/queue'; -/** - * A cargo of tasks for the worker function to complete. Cargo inherits all of - * the same methods and event callbacks as [`queue`]{@link module:ControlFlow.queue}. - * @typedef {Object} CargoObject - * @memberOf module:ControlFlow - * @property {Function} length - A function returning the number of items - * waiting to be processed. Invoke like `cargo.length()`. - * @property {number} payload - An `integer` for determining how many tasks - * should be process per round. This property can be changed after a `cargo` is - * created to alter the payload on-the-fly. - * @property {Function} push - Adds `task` to the `queue`. The callback is - * called once the `worker` has finished processing the task. Instead of a - * single task, an array of `tasks` can be submitted. The respective callback is - * used for every task in the list. Invoke like `cargo.push(task, [callback])`. - * @property {Function} saturated - A callback that is called when the - * `queue.length()` hits the concurrency and further tasks will be queued. - * @property {Function} empty - A callback that is called when the last item - * from the `queue` is given to a `worker`. - * @property {Function} drain - A callback that is called when the last item - * from the `queue` has returned from the `worker`. - * @property {Function} idle - a function returning false if there are items - * waiting or being processed, or true if not. Invoke like `cargo.idle()`. - * @property {Function} pause - a function that pauses the processing of tasks - * until `resume()` is called. Invoke like `cargo.pause()`. - * @property {Function} resume - a function that resumes the processing of - * queued tasks when the queue is paused. Invoke like `cargo.resume()`. - * @property {Function} kill - a function that removes the `drain` callback and - * empties remaining tasks from the queue forcing it to go idle. Invoke like `cargo.kill()`. - */ - /** * Creates a `cargo` object with the specified payload. Tasks added to the * cargo will be processed altogether (up to the `payload` limit). If the @@ -53,7 +23,7 @@ import queue from './internal/queue'; * @param {number} [payload=Infinity] - An optional `integer` for determining * how many tasks should be processed per round; if omitted, the default is * unlimited. - * @returns {module:ControlFlow.CargoObject} A cargo object to manage the tasks. Callbacks can + * @returns {module:ControlFlow.QueueObject} A cargo object to manage the tasks. Callbacks can * attached as certain properties to listen for specific events during the * lifecycle of the cargo and inner queue. * @example @@ -73,9 +43,8 @@ import queue from './internal/queue'; * cargo.push({name: 'bar'}, function(err) { * console.log('finished processing bar'); * }); - * cargo.push({name: 'baz'}, function(err) { - * console.log('finished processing baz'); - * }); + * await cargo.push({name: 'baz'}); + * console.log('finished processing baz'); */ export default function cargo(worker, payload) { return queue(worker, 1, payload); diff --git a/lib/internal/queue.js b/lib/internal/queue.js index 751b56201..0db96a8f4 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -3,8 +3,6 @@ import setImmediate from './setImmediate'; import DLL from './DoublyLinkedList'; import wrapAsync from './wrapAsync'; -const noop = () => {} - export default function queue(worker, concurrency, payload) { if (concurrency == null) { concurrency = 1; @@ -16,6 +14,35 @@ export default function queue(worker, concurrency, payload) { var _worker = wrapAsync(worker); var numRunning = 0; var workersList = []; + const events = { + error: [], + drain: [], + saturated: [], + unsaturated: [], + empty: [] + } + + function on (event, handler) { + events[event].push(handler) + } + + function once (event, handler) { + const handleAndRemove = (...args) => { + off(event, handleAndRemove) + handler(...args) + } + events[event].push(handleAndRemove) + } + + function off (event, handler) { + if (!event) return Object.keys(events).forEach(ev => events[ev] = []) + if (!handler) return events[event] = [] + events[event] = events[event].filter(ev => ev !== handler) + } + + function trigger (event, ...args) { + events[event].forEach(handler => handler(...args)) + } var processingScheduled = false; function _insert(data, insertAtFront, callback) { @@ -23,25 +50,32 @@ export default function queue(worker, concurrency, payload) { throw new Error('task callback must be a function'); } q.started = true; - if (!Array.isArray(data)) { - data = [data]; - } - if (data.length === 0 && q.idle()) { - // call drain immediately if there are no tasks - return setImmediate(() => q.drain()); + if (Array.isArray(data)) { + if (data.length === 0 && q.idle()) { + // call drain immediately if there are no tasks + return setImmediate(() => trigger('drain')); + } + + return data.map(datum => _insert(datum, insertAtFront, callback)); } - for (var i = 0, l = data.length; i < l; i++) { - var item = { - data: data[i], - callback: callback || noop - }; + var res; - if (insertAtFront) { - q._tasks.unshift(item); - } else { - q._tasks.push(item); + var item = { + data, + callback: callback || function (err, ...args) { + // we don't care about the error, let the global error handler + // deal with it + if (err) return + if (args.length <= 1) return res(args[0]) + res(args) } + }; + + if (insertAtFront) { + q._tasks.unshift(item); + } else { + q._tasks.push(item); } if (!processingScheduled) { @@ -51,9 +85,15 @@ export default function queue(worker, concurrency, payload) { q.process(); }); } + + if (!callback) { + return new Promise((resolve) => { + res = resolve + }) + } } - function _next(tasks) { + function _createCB(tasks) { return function (err, ...args) { numRunning -= 1; @@ -70,21 +110,35 @@ export default function queue(worker, concurrency, payload) { task.callback(err, ...args); if (err != null) { - q.error(err, task.data); + trigger('error', err, task.data); } } if (numRunning <= (q.concurrency - q.buffer) ) { - q.unsaturated(); + trigger('unsaturated') } if (q.idle()) { - q.drain(); + trigger('drain') } q.process(); }; } + const eventMethod = (name) => (handler) => { + if (!handler) { + return new Promise((resolve, reject) => { + once(name, (err, data) => { + if (err) return reject(err) + resolve(data) + }) + }) + } + off(name) + on(name, handler) + + } + var isProcessing = false; var q = { _tasks: new DLL(), @@ -93,23 +147,18 @@ export default function queue(worker, concurrency, payload) { }, concurrency, payload, - saturated: noop, - unsaturated:noop, buffer: concurrency / 4, - empty: noop, - drain: noop, - error: noop, started: false, paused: false, push (data, callback) { - _insert(data, false, callback); + return _insert(data, false, callback); }, kill () { - q.drain = noop; + off() q._tasks.empty(); }, unshift (data, callback) { - _insert(data, true, callback); + return _insert(data, true, callback); }, remove (testFn) { q._tasks.remove(testFn); @@ -135,14 +184,14 @@ export default function queue(worker, concurrency, payload) { numRunning += 1; if (q._tasks.length === 0) { - q.empty(); + trigger('empty'); } if (numRunning === q.concurrency) { - q.saturated(); + trigger('saturated'); } - var cb = onlyOnce(_next(tasks)); + var cb = onlyOnce(_createCB(tasks)); _worker(data, cb); } isProcessing = false; @@ -168,5 +217,28 @@ export default function queue(worker, concurrency, payload) { setImmediate(q.process); } }; + // define these as fixed properties, so people get useful errors when updating + Object.defineProperties(q, { + saturated: { + writable: false, + value: eventMethod('saturated') + }, + unsaturated: { + writable: false, + value: eventMethod('unsaturated') + }, + empty: { + writable: false, + value: eventMethod('empty') + }, + drain: { + writable: false, + value: eventMethod('drain') + }, + error: { + writable: false, + value: eventMethod('error') + }, + }) return q; } diff --git a/lib/queue.js b/lib/queue.js index 4462f80c8..53cf4ad62 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -18,6 +18,9 @@ import wrapAsync from './internal/wrapAsync'; * @property {number} concurrency - an integer for determining how many `worker` * functions should be run in parallel. This property can be changed after a * `queue` is created to alter the concurrency on-the-fly. + * @property {number} payload - an integer that specifies how many items are + * passed to the worker function at a time. only applies if this is a + * [cargo]{@link module:ControlFlow.cargo} object * @property {Function} push - add a new task to the `queue`. Calls `callback` * once the `worker` has finished processing the task. Instead of a single task, * a `tasks` array can be submitted. The respective callback is used for every @@ -30,20 +33,26 @@ import wrapAsync from './internal/wrapAsync'; * [priorityQueue]{@link module:ControlFlow.priorityQueue} object. * Invoked with `queue.remove(testFn)`, where `testFn` is of the form * `function ({data, priority}) {}` and returns a Boolean. - * @property {Function} saturated - a callback that is called when the number of - * running workers hits the `concurrency` limit, and further tasks will be - * queued. - * @property {Function} unsaturated - a callback that is called when the number - * of running workers is less than the `concurrency` & `buffer` limits, and - * further tasks will not be queued. + * @property {Function} saturated - a function that sets a callback that is + * called when the number of running workers hits the `concurrency` limit, and + * further tasks will be queued. If the callback is omitted, `q.saturated()` + * returns a promise for the next occurrence. + * @property {Function} unsaturated - a function that sets a callback that is + * called when the number of running workers is less than the `concurrency` & + * `buffer` limits, and further tasks will not be queued. If the callback is + * omitted, `q.unsaturated()` returns a promise for the next occurrence. * @property {number} buffer - A minimum threshold buffer in order to say that * the `queue` is `unsaturated`. - * @property {Function} empty - a callback that is called when the last item - * from the `queue` is given to a `worker`. - * @property {Function} drain - a callback that is called when the last item - * from the `queue` has returned from the `worker`. - * @property {Function} error - a callback that is called when a task errors. - * Has the signature `function(error, task)`. + * @property {Function} empty - a function that sets a callback that is called + * when the last item from the `queue` is given to a `worker`. If the callback + * is omitted, `q.empty()` returns a promise for the next occurrence. + * @property {Function} drain - a function that sets a callback that is called + * when the last item from the `queue` has returned from the `worker`. If the + * callback is omitted, `q.drain()` returns a promise for the next occurrence. + * @property {Function} error - a function that sets a callback that is called + * when a task errors. Has the signature `function(error, task)`. If the + * callback is omitted, `error()` returns a promise that rejects on the next + * error. * @property {boolean} paused - a boolean for determining whether the queue is * in a paused state. * @property {Function} pause - a function that pauses the processing of tasks @@ -65,6 +74,12 @@ import wrapAsync from './internal/wrapAsync'; * for (let item of q) { * console.log(item) * } + * + * q.drain(() => { + * console.log('all done') + * }) + * // or + * await q.drain() */ /** @@ -96,22 +111,23 @@ import wrapAsync from './internal/wrapAsync'; * }, 2); * * // assign a callback - * q.drain = function() { + * q.drain(function() { * console.log('all items have been processed'); - * }; + * }); + * // or await the end + * await q.drain() * * // assign an error callback - * q.error = function(err, task) { + * q.error(function(err, task) { * console.error('task experienced an error'); - * }; + * }); * * // add some items to the queue * q.push({name: 'foo'}, function(err) { * console.log('finished processing foo'); * }); - * q.push({name: 'bar'}, function (err) { - * console.log('finished processing bar'); - * }); + * // callback is optional + * q.push({name: 'bar'}); * * // add some items to the queue (batch-wise) * q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) { diff --git a/test/cargo.js b/test/cargo.js index 2191eb888..59e28f57c 100644 --- a/test/cargo.js +++ b/test/cargo.js @@ -59,7 +59,7 @@ describe('cargo', () => { }, 30); - c.drain = function () { + c.drain(() => { expect(call_order).to.eql([ 'process 1 2', 'callback 1', 'callback 2', 'process 3 4', 'callback 3', 'callback 4', @@ -67,7 +67,7 @@ describe('cargo', () => { ]); expect(c.length()).to.equal(0); done(); - }; + }); }); it('without callback', (done) => { @@ -95,7 +95,7 @@ describe('cargo', () => { c.push(5); }, 80); - c.drain = function() { + c.drain(() => { expect(call_order).to.eql([ 'process 1', 'process 2', @@ -103,7 +103,7 @@ describe('cargo', () => { 'process 5' ]); done(); - } + }) }); it('bulk task', (done) => { @@ -145,9 +145,9 @@ describe('cargo', () => { }, 3); var drainCounter = 0; - c.drain = function () { + c.drain(() => { drainCounter++; - }; + }); for(var i = 0; i < 10; i++){ c.push(i); @@ -172,7 +172,7 @@ describe('cargo', () => { } var drainCounter = 0; - c.drain = function () { + c.drain(() => { drainCounter++; if (drainCounter === 1) { @@ -181,7 +181,7 @@ describe('cargo', () => { expect(drainCounter).to.equal(2); done(); } - }; + }); loadCargo(); }); @@ -195,15 +195,15 @@ describe('cargo', () => { }, 1); q.concurrency = 3; - q.saturated = function() { + q.saturated(() => { assert(q.running() == 3, 'cargo should be saturated now'); calls.push('saturated'); - }; - q.empty = function() { + }); + q.empty(() => { assert(q.length() === 0, 'cargo should be empty now'); calls.push('empty'); - }; - q.drain = function() { + }); + q.drain(() => { assert( q.length() === 0 && q.running() === 0, 'cargo should be empty now and no more workers should be running' @@ -227,7 +227,7 @@ describe('cargo', () => { 'drain' ]); done(); - }; + }); q.push('foo', () => {calls.push('foo cb');}); q.push('bar', () => {calls.push('bar cb');}); q.push('zoo', () => {calls.push('zoo cb');}); @@ -249,9 +249,7 @@ describe('cargo', () => { setTimeout(cb, 25); }, 1); - cargo.drain = function () { - done(); - }; + cargo.drain(done); expect(cargo.payload).to.equal(1); @@ -286,11 +284,11 @@ describe('cargo', () => { }); }, 2); - cargo.drain = function() { + cargo.drain(() => { expect(cargo.workersList()).to.eql([]); expect(cargo.running()).to.equal(0); done(); - }; + }); cargo.push('foo'); cargo.push('bar'); @@ -306,10 +304,10 @@ describe('cargo', () => { }); }, 2); - cargo.drain = function() { + cargo.drain(() => { expect(cargo.running()).to.equal(0); done(); - }; + }); cargo.push('foo'); cargo.push('bar'); diff --git a/test/cargoQueue.js b/test/cargoQueue.js index 517422c37..46801c792 100644 --- a/test/cargoQueue.js +++ b/test/cargoQueue.js @@ -61,7 +61,7 @@ describe('cargoQueue', () => { expect(c.length()).to.equal(2); - c.drain = function () { + c.drain(() => { expect(call_order).to.eql([ 'process 1 2', 'callback 1', 'callback 2', 'process 3', 'callback 3', @@ -69,7 +69,7 @@ describe('cargoQueue', () => { ]); expect(c.length()).to.equal(0); done(); - }; + }); }); it('without callback', (done) => { @@ -83,7 +83,7 @@ describe('cargoQueue', () => { c.push(4); setImmediate(() => { c.push(5); - c.drain = function complete () { + c.drain(() => { expect(call_order).to.eql([ 'process 1', 'process 2', @@ -91,7 +91,7 @@ describe('cargoQueue', () => { 'process 5' ]); done(); - } + }) }) }) }) @@ -135,9 +135,9 @@ describe('cargoQueue', () => { }, 3, 2); var drainCounter = 0; - c.drain = function () { + c.drain(() => { drainCounter++; - }; + }); for(var i = 0; i < 10; i++){ c.push(i); @@ -162,7 +162,7 @@ describe('cargoQueue', () => { } var drainCounter = 0; - c.drain = function () { + c.drain(() => { drainCounter++; if (drainCounter === 1) { @@ -171,7 +171,7 @@ describe('cargoQueue', () => { expect(drainCounter).to.equal(2); done(); } - }; + }); loadCargo(); }); @@ -184,15 +184,15 @@ describe('cargoQueue', () => { async.setImmediate(cb); }, 3, 1); - q.saturated = function() { + q.saturated(() => { assert(q.running() == 3, 'cargoQueue should be saturated now'); calls.push('saturated'); - }; - q.empty = function() { + }); + q.empty(() => { assert(q.length() === 0, 'cargoQueue should be empty now'); calls.push('empty'); - }; - q.drain = function() { + }); + q.drain(() => { assert( q.length() === 0 && q.running() === 0, 'cargoQueue should be empty now and no more workers should be running' @@ -216,7 +216,7 @@ describe('cargoQueue', () => { 'drain' ]); done(); - }; + }); q.push('foo', () => {calls.push('foo cb');}); q.push('bar', () => {calls.push('bar cb');}); q.push('zoo', () => {calls.push('zoo cb');}); @@ -238,9 +238,7 @@ describe('cargoQueue', () => { setTimeout(cb, 25); }, 1, 1); - cargo.drain = function () { - done(); - }; + cargo.drain(done); expect(cargo.payload).to.equal(1); @@ -264,9 +262,7 @@ describe('cargoQueue', () => { setTimeout(cb, 25); }, 1, 1); - cargo.drain = function () { - done(); - }; + cargo.drain(done); expect(cargo.concurrency).to.equal(1); @@ -301,11 +297,11 @@ describe('cargoQueue', () => { }); }, 1, 2); - cargo.drain = function() { + cargo.drain(() => { expect(cargo.workersList()).to.eql([]); expect(cargo.running()).to.equal(0); done(); - }; + }); cargo.push('foo'); cargo.push('bar'); @@ -321,10 +317,10 @@ describe('cargoQueue', () => { }); }, 1, 1); - cargo.drain = function() { + cargo.drain(() => { expect(cargo.running()).to.equal(0); done(); - }; + }); cargo.push(['foo', 'bar', 'baz', 'boo']); }) diff --git a/test/es2017/asyncFunctions.js b/test/es2017/asyncFunctions.js index 553b7f4b2..bf0d70853 100644 --- a/test/es2017/asyncFunctions.js +++ b/test/es2017/asyncFunctions.js @@ -350,10 +350,10 @@ module.exports = function () { result.push(await Promise.resolve(val)); }, 2) - q.drain = () => { + q.drain(() => { expect(result).to.eql([[1, 2], [3]]); done(); - }; + }); q.push(1); q.push(2); @@ -366,10 +366,10 @@ module.exports = function () { result.push(await Promise.resolve(val)); }, 2) - q.drain = () => { + q.drain(() => { expect(result).to.eql([1, 2, 3]); done(); - }; + }); q.push(1); q.push(2); @@ -382,10 +382,10 @@ module.exports = function () { result.push(await Promise.resolve(val)); }, 2) - q.drain = () => { + q.drain(() => { expect(result).to.eql([1, 2, 3]); done(); - }; + }); q.push(1); q.push(2); diff --git a/test/es2017/awaitableFunctions.js b/test/es2017/awaitableFunctions.js index 55bcc9e00..e8adf7b70 100644 --- a/test/es2017/awaitableFunctions.js +++ b/test/es2017/awaitableFunctions.js @@ -586,6 +586,51 @@ module.exports = function () { expect(calls).to.eql([1, 2, 3, 4]) }); + it('should work with queues', async () => { + const q = async.queue(async (data) => { + if (data === 6) throw new Error('oh noes') + await new Promise(resolve => setTimeout(resolve, 10)) + return data + }, 5) + + const calls = [] + const errorCalls = [] + const emptyCalls = [] + q.error().catch(d => errorCalls.push('error ' + d)) + q.saturated().then(() => calls.push('saturated')) + q.unsaturated().then(() => calls.push('unsaturated')) + q.empty().then(() => emptyCalls.push('empty')) + + q.push(1).then(d => calls.push('push cb ' + d)) + q.push(2).then(d => calls.push('push cb ' + d)) + q.push([3, 4, 5, 6]).map(p => p.then(d => calls.push('push cb ' + d))) + q.push(7).then(d => calls.push('push cb ' + d)) + q.push(8).then(d => calls.push('push cb ' + d)) + + const multiP = Promise.all(q.push([9, 10])) + + await q.drain() + await multiP + expect(calls).to.eql([ + 'saturated', + 'push cb 1', + 'push cb 2', + 'push cb 3', + 'push cb 4', + 'push cb 5', + 'push cb 7', + 'unsaturated', + 'push cb 8' + ]) + + expect(errorCalls).to.eql([ + 'error Error: oh noes', + ]) + expect(emptyCalls).to.eql([ + 'empty', + ]) + }) + /* * Util */ diff --git a/test/priorityQueue.js b/test/priorityQueue.js index c3ece0289..96936fdac 100644 --- a/test/priorityQueue.js +++ b/test/priorityQueue.js @@ -40,7 +40,7 @@ describe('priorityQueue', () => { expect(q.length()).to.equal(4); expect(q.concurrency).to.equal(1); - q.drain = function () { + q.drain(() => { expect(call_order).to.eql([ 'process 2', 'callback 2', 'process 1', 'callback 1', @@ -50,7 +50,7 @@ describe('priorityQueue', () => { expect(q.concurrency).to.equal(1); expect(q.length()).to.equal(0); done(); - }; + }); }); it('concurrency', (done) => { @@ -95,7 +95,7 @@ describe('priorityQueue', () => { expect(q.length()).to.equal(4); expect(q.concurrency).to.equal(2); - q.drain = function () { + q.drain(() => { expect(call_order).to.eql([ 'process 1', 'callback 1', 'process 2', 'callback 2', @@ -105,7 +105,7 @@ describe('priorityQueue', () => { expect(q.concurrency).to.equal(2); expect(q.length()).to.equal(0); done(); - }; + }); }); it('pause in worker with concurrency', (done) => { @@ -131,10 +131,10 @@ describe('priorityQueue', () => { q.push({ id: 4 }); q.push({ id: 5 }); - q.drain = function () { + q.drain(() => { expect(call_order).to.eql([1, 2, 3, 4, 5]); done(); - }; + }); }); context('q.saturated(): ', () => { @@ -144,10 +144,10 @@ describe('priorityQueue', () => { calls.push('process ' + task); async.setImmediate(cb); }, 4); - q.saturated = function() { + q.saturated(() => { calls.push('saturated'); - }; - q.empty = function() { + }); + q.empty(() => { expect(calls.indexOf('saturated')).to.be.above(-1); setTimeout(() => { expect(calls).eql([ @@ -166,7 +166,7 @@ describe('priorityQueue', () => { ]); done(); }, 50); - }; + }); q.push('foo0', 5, () => {calls.push('foo0 cb');}); q.push('foo1', 4, () => {calls.push('foo1 cb');}); q.push('foo2', 3, () => {calls.push('foo2 cb');}); @@ -206,10 +206,10 @@ describe('priorityQueue', () => { calls.push('process ' + task); setTimeout(cb, 10); }, 4); - q.unsaturated = function() { + q.unsaturated(() => { calls.push('unsaturated'); - }; - q.empty = function() { + }); + q.empty(() => { expect(calls.indexOf('unsaturated')).to.be.above(-1); setTimeout(() => { expect(calls).eql([ @@ -231,7 +231,7 @@ describe('priorityQueue', () => { ]); done(); }, 50); - }; + }); q.push('foo0', 5, () => {calls.push('foo0 cb');}); q.push('foo1', 4, () => {calls.push('foo1 cb');}); q.push('foo2', 3, () => {calls.push('foo2 cb');}); @@ -262,7 +262,7 @@ describe('priorityQueue', () => { expect(q.length()).to.equal(2); - q.drain = function () { + q.drain(() => { expect(call_order).to.eql([ 'process 1', 'callback 1', 'process 2', 'callback 2' @@ -271,7 +271,7 @@ describe('priorityQueue', () => { expect(q.length()).to.equal(0); expect(q.running()).to.equal(0); done(); - }; + }); q.push([], 1, () => {}); }); diff --git a/test/queue.js b/test/queue.js index f103aaa52..5c31116c4 100644 --- a/test/queue.js +++ b/test/queue.js @@ -50,7 +50,7 @@ describe('queue', function(){ expect(q.length()).to.equal(4); expect(q.concurrency).to.equal(2); - q.drain = function () { + q.drain(() => { expect(call_order).to.eql([ 'process 2', 'callback 2', 'process 1', 'callback 1', @@ -60,7 +60,7 @@ describe('queue', function(){ expect(q.concurrency).to.equal(2); expect(q.length()).to.equal(0); done(); - }; + }); }); it('default concurrency', (done) => { @@ -103,7 +103,7 @@ describe('queue', function(){ expect(q.length()).to.equal(4); expect(q.concurrency).to.equal(1); - q.drain = function () { + q.drain(() => { expect(call_order).to.eql([ 'process 1', 'callback 1', 'process 2', 'callback 2', @@ -113,7 +113,7 @@ describe('queue', function(){ expect(q.concurrency).to.equal(1); expect(q.length()).to.equal(0); done(); - }; + }); }); it('zero concurrency', (done) => { @@ -132,10 +132,10 @@ describe('queue', function(){ callback(task.name === 'foo' ? new Error('fooError') : null); }, 2); - q.drain = function() { + q.drain(() => { expect(results).to.eql(['bar', 'fooError']); done(); - }; + }); q.push({name: 'bar'}, (err) => { if(err) { @@ -163,17 +163,17 @@ describe('queue', function(){ callback(task.name === 'foo' ? new Error('fooError') : null); }, 2); - q.error = function(error, task) { + q.error((error, task) => { expect(error).to.exist; expect(error.message).to.equal('fooError'); expect(task.name).to.equal('foo'); results.push('fooError'); - }; + }); - q.drain = function() { + q.drain (() => { expect(results).to.eql(['fooError', 'bar']); done(); - }; + }); q.push({name: 'foo'}); @@ -201,9 +201,7 @@ describe('queue', function(){ q.push(''); } - q.drain = function(){ - done(); - }; + q.drain(done); setTimeout(() => { expect(q.concurrency).to.equal(1); @@ -245,7 +243,7 @@ describe('queue', function(){ q.push(3); q.push(4); - q.drain = function () { + q.drain(() => { expect(running).to.eql(0); expect(concurrencyList).to.eql([1, 2, 2, 2]); expect(call_order).to.eql([ @@ -255,7 +253,7 @@ describe('queue', function(){ 'process 3' ]); done(); - }; + }); }); it('push with non-function', (done) => { @@ -320,7 +318,7 @@ describe('queue', function(){ expect(q.length()).to.equal(4); expect(q.concurrency).to.equal(2); - q.drain = function () { + q.drain(() => { expect(call_order).to.eql([ 'process 2', 'callback 2', 'process 1', 'callback 1', @@ -330,7 +328,7 @@ describe('queue', function(){ expect(q.concurrency).to.equal(2); expect(q.length()).to.equal(0); done(); - }; + }); }); it('idle', (done) => { @@ -351,11 +349,11 @@ describe('queue', function(){ // Queue is busy when tasks added expect(q.idle()).to.equal(false); - q.drain = function() { + q.drain(() => { // Queue is idle after drain expect(q.idle()).to.equal(true); done(); - }; + }); }); it('pause', (done) => { @@ -397,7 +395,7 @@ describe('queue', function(){ q.resume(); q.push(5); q.push(6); - q.drain = drain; + q.drain(drain); } function drain () { expect(concurrencyList).to.eql([1, 2, 2, 1, 2, 2]); @@ -436,10 +434,10 @@ describe('queue', function(){ q.push({ id: 4 }); q.push({ id: 5 }); - q.drain = function () { + q.drain(() => { expect(call_order).to.eql([1, 2, 3, 4, 5]); done(); - }; + }); }); it('start paused', (done) => { @@ -462,9 +460,7 @@ describe('queue', function(){ q.resume(); }, 5); - q.drain = function () { - done(); - }; + q.drain(done); }); it('kill', (done) => { @@ -473,9 +469,9 @@ describe('queue', function(){ throw new Error("Function should never be called"); }, 20); }, 1); - q.drain = function() { + q.drain(() => { throw new Error("Function should never be called"); - }; + }); q.push(0); @@ -496,15 +492,15 @@ describe('queue', function(){ }, 3); q.concurrency = 3; - q.saturated = function() { + q.saturated(() => { assert(q.running() == 3, 'queue should be saturated now'); calls.push('saturated'); - }; - q.empty = function() { + }); + q.empty(() => { assert(q.length() === 0, 'queue should be empty now'); calls.push('empty'); - }; - q.drain = function() { + }); + q.drain(() => { assert( q.length() === 0 && q.running() === 0, 'queue should be empty now and no more workers should be running' @@ -528,7 +524,7 @@ describe('queue', function(){ 'drain' ]); done(); - }; + }); q.push('foo', () => {calls.push('foo cb');}); q.push('bar', () => {calls.push('bar cb');}); q.push('zoo', () => {calls.push('zoo cb');}); @@ -544,7 +540,7 @@ describe('queue', function(){ async.setImmediate(cb); }, 3); - q.drain = function() { + q.drain(() => { assert( q.length() === 0 && q.running() === 0, 'queue should be empty now and no more workers should be running' @@ -554,7 +550,7 @@ describe('queue', function(){ 'drain' ]); done(); - }; + }); q.push([]); }); @@ -568,14 +564,14 @@ describe('queue', function(){ async.setImmediate(cb); }, 1); - q.empty = function () { + q.empty(() => { calls.push('empty'); assert(q.idle() === false, 'tasks should be running when empty is called') expect(q.running()).to.equal(1); - } + }) - q.drain = function() { + q.drain(() => { calls.push('drain'); expect(calls).to.eql([ 'empty', @@ -583,7 +579,7 @@ describe('queue', function(){ 'drain' ]); done(); - }; + }); q.push(1); }); @@ -593,13 +589,13 @@ describe('queue', function(){ async.setImmediate(cb); }, 2); - q.saturated = function () { + q.saturated(() => { saturatedCalled = true; - }; - q.drain = function () { + }) + q.drain(() => { assert(saturatedCalled, "saturated not called"); done(); - }; + }) q.push(['foo', 'bar', 'baz', 'moo']); }); @@ -623,10 +619,10 @@ describe('queue', function(){ calls.push('process ' + task); async.setImmediate(cb); }, 4); - q.saturated = function() { + q.saturated(() => { calls.push('saturated'); - }; - q.empty = function() { + }); + q.empty(() => { expect(calls.indexOf('saturated')).to.be.above(-1); setTimeout(() => { expect(calls).eql([ @@ -645,7 +641,7 @@ describe('queue', function(){ ]); done(); }, 50); - }; + }); q.push('foo0', () => {calls.push('foo0 cb');}); q.push('foo1', () => {calls.push('foo1 cb');}); q.push('foo2', () => {calls.push('foo2 cb');}); @@ -683,10 +679,10 @@ describe('queue', function(){ calls.push('process ' + task); async.setImmediate(cb); }, 4); - q.unsaturated = function() { + q.unsaturated(() => { calls.push('unsaturated'); - }; - q.empty = function() { + }); + q.empty(() => { expect(calls.indexOf('unsaturated')).to.be.above(-1); setTimeout(() => { expect(calls).eql([ @@ -708,7 +704,7 @@ describe('queue', function(){ ]); done(); }, 50); - }; + }); q.push('foo0', () => {calls.push('foo0 cb');}); q.push('foo1', () => {calls.push('foo1 cb');}); q.push('foo2', () => {calls.push('foo2 cb');}); @@ -726,11 +722,11 @@ describe('queue', function(){ }); }, 2); - q.drain = function() { + q.drain(() => { expect(q.workersList().length).to.equal(0); expect(q.running()).to.equal(0); done(); - }; + }); q.push('foo'); q.push('bar'); @@ -767,11 +763,11 @@ describe('queue', function(){ }); }, 2); - q.drain = function() { + q.drain(() => { expect(q.workersList()).to.eql([]); expect(q.workersList().length).to.equal(q.running()); done(); - }; + }); q.push('foo'); q.push('bar'); @@ -792,10 +788,10 @@ describe('queue', function(){ return node.data === 3; }); - q.drain = function () { + q.drain(() => { expect(result).to.eql([1, 2, 4, 5]); done(); - } + }) }); it('should be iterable', (done) => { @@ -811,9 +807,16 @@ describe('queue', function(){ expect([...q]).to.eql([1, 2, 3, 4, 5]); - q.drain = function () { + q.drain(() => { expect([...q]).to.eql([]); done(); - } + }) + }) + + it('should error when re-assigning event methods', () => { + var q = async.queue(() => {}) + expect(() => { + q.drain = () => {} + }).to.throw() }) });