Skip to content

Commit

Permalink
feat: await-able Async methods (#1572)
Browse files Browse the repository at this point in the history
* make each and family awaitable

* dont pretend they're AsyncFunctions

* check errors

* ensure function name is preserved somehow

* awaitable concat

* awaitable detect

* awaitable every/filter

* awaitable groupBy

* awaitable map/mapValues

* awaitable reduce

* awaitable reject

* awaitable some

* awaitable transform

* awaitable times

* awaitable auto

* awaitable compose/seq

* awaitable whilst/until (lol)

* awaitable forever

* awaitable parallel/race

* awaitable retry

* awaitable series (lol)

* awaitable tryEach

* awaitable waterfall (lol)

* lint

* cleanup, remove noop and unused internal functions
  • Loading branch information
aearly committed Oct 1, 2018
1 parent df41256 commit 8aecf10
Show file tree
Hide file tree
Showing 77 changed files with 987 additions and 201 deletions.
13 changes: 7 additions & 6 deletions lib/auto.js
@@ -1,8 +1,7 @@
import noop from './internal/noop';

import once from './internal/once';
import onlyOnce from './internal/onlyOnce';
import wrapAsync from './internal/wrapAsync';
import { promiseCallback, PROMISE_SYMBOL } from './internal/promiseCallback'

/**
* Determines the best order for running the {@link AsyncFunction}s in `tasks`, based on
Expand Down Expand Up @@ -42,7 +41,7 @@ import wrapAsync from './internal/wrapAsync';
* pass an error to their callback. Results are always returned; however, if an
* error occurs, no further `tasks` will be performed, and the results object
* will only contain partial results. Invoked with (err, results).
* @returns undefined
* @returns {Promise} a promise, if a callback is not passed
* @example
*
* async.auto({
Expand Down Expand Up @@ -83,13 +82,13 @@ import wrapAsync from './internal/wrapAsync';
* console.log('results = ', results);
* });
*/
export default function (tasks, concurrency, callback) {
if (typeof concurrency === 'function') {
export default function auto(tasks, concurrency, callback) {
if (typeof concurrency !== 'number') {
// concurrency is optional, shift the args.
callback = concurrency;
concurrency = null;
}
callback = once(callback || noop);
callback = once(callback || promiseCallback());
var numTasks = Object.keys(tasks).length;
if (!numTasks) {
return callback(null);
Expand Down Expand Up @@ -251,4 +250,6 @@ export default function (tasks, concurrency, callback) {
});
return result;
}

return callback[PROMISE_SYMBOL]
}
3 changes: 2 additions & 1 deletion lib/autoInject.js
Expand Up @@ -54,6 +54,7 @@ function parseParams(func) {
* the tasks have been completed. It receives the `err` argument if any `tasks`
* pass an error to their callback, and a `results` object with any completed
* task results, similar to `auto`.
* @returns {Promise} a promise, if no callback is passed
* @example
*
* // The example from `auto` can be rewritten as follows:
Expand Down Expand Up @@ -142,5 +143,5 @@ export default function autoInject(tasks, callback) {
}
});

auto(newTasks, callback);
return auto(newTasks, callback);
}
5 changes: 4 additions & 1 deletion lib/compose.js
Expand Up @@ -6,6 +6,9 @@ import seq from './seq';
* follows. Composing functions `f()`, `g()`, and `h()` would produce the result
* of `f(g(h()))`, only this version uses callbacks to obtain the return values.
*
* If the last argument to the composed function is not a function, a promise
* is returned when you call it.
*
* Each function is executed with the `this` binding of the composed function.
*
* @name compose
Expand Down Expand Up @@ -35,6 +38,6 @@ import seq from './seq';
* // result now equals 15
* });
*/
export default function(...args) {
export default function compose(...args) {
return seq(...args.reverse());
}
10 changes: 7 additions & 3 deletions lib/concat.js
@@ -1,5 +1,5 @@
import doLimit from './internal/doLimit';
import concatLimit from './concatLimit';
import awaitify from './internal/awaitify'

/**
* Applies `iteratee` to each item in `coll`, concatenating the results. Returns
Expand All @@ -15,14 +15,18 @@ import concatLimit from './concatLimit';
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`,
* which should use an array as its result. Invoked with (item, callback).
* @param {Function} [callback(err)] - A callback which is called after all the
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished, or an error occurs. Results is an array
* containing the concatenated results of the `iteratee` function. Invoked with
* (err, results).
* @returns A Promise, if no callback is passed
* @example
*
* async.concat(['dir1','dir2','dir3'], fs.readdir, function(err, files) {
* // files is now a list of filenames that exist in the 3 directories
* });
*/
export default doLimit(concatLimit, Infinity);
function concat(coll, iteratee, callback) {
return concatLimit(coll, Infinity, iteratee, callback)
}
export default awaitify(concat, 3);
9 changes: 5 additions & 4 deletions lib/concatLimit.js
@@ -1,6 +1,6 @@
import noop from './internal/noop';
import wrapAsync from './internal/wrapAsync';
import mapLimit from './mapLimit';
import awaitify from './internal/awaitify'

/**
* The same as [`concat`]{@link module:Collections.concat} but runs a maximum of `limit` async operations at a time.
Expand All @@ -19,11 +19,11 @@ import mapLimit from './mapLimit';
* `iteratee` functions have finished, or an error occurs. Results is an array
* containing the concatenated results of the `iteratee` function. Invoked with
* (err, results).
* @returns A Promise, if no callback is passed
*/
export default function(coll, limit, iteratee, callback) {
callback = callback || noop;
function concatLimit(coll, limit, iteratee, callback) {
var _iteratee = wrapAsync(iteratee);
mapLimit(coll, limit, (val, iterCb) => {
return mapLimit(coll, limit, (val, iterCb) => {
_iteratee(val, (err, ...args) => {
if (err) return iterCb(err);
return iterCb(null, args);
Expand All @@ -39,3 +39,4 @@ export default function(coll, limit, iteratee, callback) {
return callback(err, result);
});
}
export default awaitify(concatLimit, 4)
10 changes: 7 additions & 3 deletions lib/concatSeries.js
@@ -1,5 +1,5 @@
import doLimit from './internal/doLimit';
import concatLimit from './concatLimit';
import awaitify from './internal/awaitify'

/**
* The same as [`concat`]{@link module:Collections.concat} but runs only a single async operation at a time.
Expand All @@ -14,9 +14,13 @@ import concatLimit from './concatLimit';
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`.
* The iteratee should complete with an array an array of results.
* Invoked with (item, callback).
* @param {Function} [callback(err)] - A callback which is called after all the
* @param {Function} [callback] - A callback which is called after all the
* `iteratee` functions have finished, or an error occurs. Results is an array
* containing the concatenated results of the `iteratee` function. Invoked with
* (err, results).
* @returns A Promise, if no callback is passed
*/
export default doLimit(concatLimit, 1);
function concatSeries(coll, iteratee, callback) {
return concatLimit(coll, 1, iteratee, callback)
}
export default awaitify(concatSeries, 3);
9 changes: 7 additions & 2 deletions lib/detect.js
@@ -1,5 +1,6 @@
import createTester from './internal/createTester';
import doParallel from './internal/doParallel';
import eachOf from './eachOf'
import awaitify from './internal/awaitify'

/**
* Returns the first value in `coll` that passes an async truth test. The
Expand All @@ -26,6 +27,7 @@ import doParallel from './internal/doParallel';
* Result will be the first item in the array that passes the truth test
* (iteratee) or the value `undefined` if none passed. Invoked with
* (err, result).
* @returns A Promise, if no callback is passed
* @example
*
* async.detect(['file1','file2','file3'], function(filePath, callback) {
Expand All @@ -36,4 +38,7 @@ import doParallel from './internal/doParallel';
* // result now equals the first file in the list that exists
* });
*/
export default doParallel(createTester(bool => bool, (res, item) => item));
function detect(coll, iteratee, callback) {
return createTester(bool => bool, (res, item) => item)(eachOf, coll, iteratee, callback)
}
export default awaitify(detect, 3)
9 changes: 7 additions & 2 deletions lib/detectLimit.js
@@ -1,5 +1,6 @@
import createTester from './internal/createTester';
import doParallelLimit from './internal/doParallelLimit';
import eachOfLimit from './internal/eachOfLimit'
import awaitify from './internal/awaitify'

/**
* The same as [`detect`]{@link module:Collections.detect} but runs a maximum of `limit` async operations at a
Expand All @@ -22,5 +23,9 @@ import doParallelLimit from './internal/doParallelLimit';
* Result will be the first item in the array that passes the truth test
* (iteratee) or the value `undefined` if none passed. Invoked with
* (err, result).
* @returns a Promise if no callback is passed
*/
export default doParallelLimit(createTester(bool => bool, (res, item) => item));
function detectLimit(coll, limit, iteratee, callback) {
return createTester(bool => bool, (res, item) => item)(eachOfLimit(limit), coll, iteratee, callback)
}
export default awaitify(detectLimit, 4)
12 changes: 9 additions & 3 deletions lib/detectSeries.js
@@ -1,5 +1,6 @@
import detectLimit from './detectLimit';
import doLimit from './internal/doLimit';
import createTester from './internal/createTester'
import eachOfLimit from './internal/eachOfLimit'
import awaitify from './internal/awaitify'

/**
* The same as [`detect`]{@link module:Collections.detect} but runs only a single async operation at a time.
Expand All @@ -20,5 +21,10 @@ import doLimit from './internal/doLimit';
* Result will be the first item in the array that passes the truth test
* (iteratee) or the value `undefined` if none passed. Invoked with
* (err, result).
* @returns a Promise if no callback is passed
*/
export default doLimit(detectLimit, 1);
function detectSeries(coll, iteratee, callback) {
return createTester(bool => bool, (res, item) => item)(eachOfLimit(1), coll, iteratee, callback)
}

export default awaitify(detectSeries, 3)
3 changes: 2 additions & 1 deletion lib/doUntil.js
Expand Up @@ -20,10 +20,11 @@ import wrapAsync from './internal/wrapAsync';
* function has passed and repeated execution of `iteratee` has stopped. `callback`
* will be passed an error and any arguments passed to the final `iteratee`'s
* callback. Invoked with (err, [results]);
* @returns {Promise} a promise, if no callback is passed
*/
export default function doUntil(iteratee, test, callback) {
const _test = wrapAsync(test)
doWhilst(iteratee, (...args) => {
return doWhilst(iteratee, (...args) => {
const cb = args.pop()
_test(...args, (err, truth) => cb (err, !truth))
}, callback);
Expand Down
11 changes: 6 additions & 5 deletions lib/doWhilst.js
@@ -1,7 +1,6 @@
import noop from './internal/noop';

import onlyOnce from './internal/onlyOnce';
import wrapAsync from './internal/wrapAsync';
import awaitify from './internal/awaitify'

/**
* The post-check version of [`whilst`]{@link module:ControlFlow.whilst}. To reflect the difference in
Expand All @@ -24,10 +23,10 @@ import wrapAsync from './internal/wrapAsync';
* function has failed and repeated execution of `iteratee` has stopped.
* `callback` will be passed an error and any arguments passed to the final
* `iteratee`'s callback. Invoked with (err, [results]);
* @return undefined
* @returns {Promise} a promise, if no callback is passed
*/
export default function doWhilst(iteratee, test, callback) {
callback = onlyOnce(callback || noop);
function doWhilst(iteratee, test, callback) {
callback = onlyOnce(callback);
var _fn = wrapAsync(iteratee);
var _test = wrapAsync(test);
var results
Expand All @@ -48,3 +47,5 @@ export default function doWhilst(iteratee, test, callback) {

return check(null, true);
}

export default awaitify(doWhilst, 3)
8 changes: 6 additions & 2 deletions lib/each.js
@@ -1,6 +1,7 @@
import eachOf from './eachOf';
import withoutIndex from './internal/withoutIndex';
import wrapAsync from './internal/wrapAsync'
import awaitify from './internal/awaitify'

/**
* Applies the function `iteratee` to each item in `coll`, in parallel.
Expand All @@ -25,6 +26,7 @@ import wrapAsync from './internal/wrapAsync'
* If you need the index, use `eachOf`.
* @param {Function} [callback] - A callback which is called when all
* `iteratee` functions have finished, or an error occurs. Invoked with (err).
* @returns {Promise} a promise, if a callback is omitted
* @example
*
* // assuming openFiles is an array of file names and saveFile is a function
Expand Down Expand Up @@ -59,6 +61,8 @@ import wrapAsync from './internal/wrapAsync'
* }
* });
*/
export default function eachLimit(coll, iteratee, callback) {
eachOf(coll, withoutIndex(wrapAsync(iteratee)), callback);
function eachLimit(coll, iteratee, callback) {
return eachOf(coll, withoutIndex(wrapAsync(iteratee)), callback);
}

export default awaitify(eachLimit, 3)
7 changes: 5 additions & 2 deletions lib/eachLimit.js
@@ -1,6 +1,7 @@
import eachOfLimit from './internal/eachOfLimit';
import withoutIndex from './internal/withoutIndex';
import wrapAsync from './internal/wrapAsync';
import awaitify from './internal/awaitify'

/**
* The same as [`each`]{@link module:Collections.each} but runs a maximum of `limit` async operations at a time.
Expand All @@ -21,7 +22,9 @@ import wrapAsync from './internal/wrapAsync';
* Invoked with (item, callback).
* @param {Function} [callback] - A callback which is called when all
* `iteratee` functions have finished, or an error occurs. Invoked with (err).
* @returns {Promise} a promise, if a callback is omitted
*/
export default function eachLimit(coll, limit, iteratee, callback) {
eachOfLimit(limit)(coll, withoutIndex(wrapAsync(iteratee)), callback);
function eachLimit(coll, limit, iteratee, callback) {
return eachOfLimit(limit)(coll, withoutIndex(wrapAsync(iteratee)), callback);
}
export default awaitify(eachLimit, 4)
16 changes: 10 additions & 6 deletions lib/eachOf.js
@@ -1,15 +1,14 @@
import isArrayLike from './internal/isArrayLike';
import breakLoop from './internal/breakLoop';
import eachOfLimit from './eachOfLimit';
import doLimit from './internal/doLimit';
import noop from './internal/noop';
import once from './internal/once';
import onlyOnce from './internal/onlyOnce';
import wrapAsync from './internal/wrapAsync';
import awaitify from './internal/awaitify'

// eachOf implementation optimized for array-likes
function eachOfArrayLike(coll, iteratee, callback) {
callback = once(callback || noop);
callback = once(callback);
var index = 0,
completed = 0,
{length} = coll,
Expand All @@ -36,7 +35,9 @@ function eachOfArrayLike(coll, iteratee, callback) {
}

// a generic version of eachOf which can handle array, object, and iterator cases.
var eachOfGeneric = doLimit(eachOfLimit, Infinity);
function eachOfGeneric (coll, iteratee, callback) {
return eachOfLimit(coll, Infinity, iteratee, callback);
}

/**
* Like [`each`]{@link module:Collections.each}, except that it passes the key (or index) as the second argument
Expand All @@ -56,6 +57,7 @@ var eachOfGeneric = doLimit(eachOfLimit, Infinity);
* Invoked with (item, key, callback).
* @param {Function} [callback] - A callback which is called when all
* `iteratee` functions have finished, or an error occurs. Invoked with (err).
* @returns {Promise} a promise, if a callback is omitted
* @example
*
* var obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"};
Expand All @@ -77,7 +79,9 @@ var eachOfGeneric = doLimit(eachOfLimit, Infinity);
* doSomethingWith(configs);
* });
*/
export default function(coll, iteratee, callback) {
function eachOf(coll, iteratee, callback) {
var eachOfImplementation = isArrayLike(coll) ? eachOfArrayLike : eachOfGeneric;
eachOfImplementation(coll, wrapAsync(iteratee), callback);
return eachOfImplementation(coll, wrapAsync(iteratee), callback);
}

export default awaitify(eachOf, 3)
8 changes: 6 additions & 2 deletions lib/eachOfLimit.js
@@ -1,5 +1,6 @@
import _eachOfLimit from './internal/eachOfLimit';
import wrapAsync from './internal/wrapAsync';
import awaitify from './internal/awaitify'

/**
* The same as [`eachOf`]{@link module:Collections.eachOf} but runs a maximum of `limit` async operations at a
Expand All @@ -20,7 +21,10 @@ import wrapAsync from './internal/wrapAsync';
* Invoked with (item, key, callback).
* @param {Function} [callback] - A callback which is called when all
* `iteratee` functions have finished, or an error occurs. Invoked with (err).
* @returns {Promise} a promise, if a callback is omitted
*/
export default function eachOfLimit(coll, limit, iteratee, callback) {
_eachOfLimit(limit)(coll, wrapAsync(iteratee), callback);
function eachOfLimit(coll, limit, iteratee, callback) {
return _eachOfLimit(limit)(coll, wrapAsync(iteratee), callback);
}

export default awaitify(eachOfLimit, 4)
8 changes: 6 additions & 2 deletions lib/eachOfSeries.js
@@ -1,5 +1,5 @@
import eachOfLimit from './eachOfLimit';
import doLimit from './internal/doLimit';
import awaitify from './internal/awaitify'

/**
* The same as [`eachOf`]{@link module:Collections.eachOf} but runs only a single async operation at a time.
Expand All @@ -17,5 +17,9 @@ import doLimit from './internal/doLimit';
* Invoked with (item, key, callback).
* @param {Function} [callback] - A callback which is called when all `iteratee`
* functions have finished, or an error occurs. Invoked with (err).
* @returns {Promise} a promise, if a callback is omitted
*/
export default doLimit(eachOfLimit, 1);
function eachOfSeries(coll, iteratee, callback) {
return eachOfLimit(coll, 1, iteratee, callback)
}
export default awaitify(eachOfSeries, 3);

0 comments on commit 8aecf10

Please sign in to comment.