Skip to content

Commit

Permalink
[wip] initial async generator support (#1560)
Browse files Browse the repository at this point in the history
feat: initial async generator support
  • Loading branch information
aearly committed Aug 5, 2018
1 parent 5c174fa commit 61268c5
Show file tree
Hide file tree
Showing 52 changed files with 1,243 additions and 198 deletions.
5 changes: 4 additions & 1 deletion .babelrc
@@ -1,5 +1,8 @@
{
"plugins": ["transform-es2015-modules-commonjs"],
"plugins": [
"transform-es2015-modules-commonjs",
"syntax-async-generators"
],
"env": {
"test": {
"plugins": ["istanbul"]
Expand Down
1 change: 1 addition & 0 deletions .eslintrc
Expand Up @@ -5,6 +5,7 @@
"mocha": true,
"es6": true
},
"parser": "babel-eslint",
"parserOptions": {
"ecmaVersion": 8,
"sourceType": "module"
Expand Down
14 changes: 5 additions & 9 deletions Makefile
Expand Up @@ -7,7 +7,6 @@ export PATH := ./node_modules/.bin/:$(PATH):./bin/

PACKAGE = asyncjs
REQUIRE_NAME = async
UGLIFY = uglifyjs
XYZ = support/xyz.sh --repo git@github.com:caolan/async.git

BUILDDIR = build
Expand All @@ -21,7 +20,7 @@ LINT_FILES := lib/ test/ $(shell find perf/ -maxdepth 2 -type f) $(shell find su

UMD_BUNDLE := $(BUILDDIR)/dist/async.js
UMD_BUNDLE_MIN := $(BUILDDIR)/dist/async.min.js
UMD_BUNDLE_MAP := $(BUILDDIR)/dist/async.min.map
# UMD_BUNDLE_MAP := $(BUILDDIR)/dist/async.min.map
ALIAS_ES := $(addprefix build-es/, $(addsuffix .js, $(shell cat $(SCRIPTS)/aliases.txt | cut -d ' ' -f1)))
ALIAS_CJS := $(patsubst build-es/%, build/%, $(ALIAS_ES))
ES_MODULES := $(patsubst lib/%.js, build-es/%.js, $(JS_SRC)) $(ALIAS_ES)
Expand Down Expand Up @@ -81,26 +80,23 @@ $(UMD_BUNDLE): $(ES_MODULES) package.json
node $(SCRIPTS)/build/aggregate-bundle.js

# Create the minified UMD versions and copy them to dist/ for bower
build-dist: $(DIST) $(DIST)/async.js $(DIST)/async.min.js $(DIST)/async.min.map
build-dist: $(DIST) $(DIST)/async.js $(DIST)/async.min.js # $(DIST)/async.min.map

$(DIST):
mkdir -p $@

$(UMD_BUNDLE_MIN): $(UMD_BUNDLE)
mkdir -p "$(@D)"
$(UGLIFY) $< --mangle --compress \
--source-map $(UMD_BUNDLE_MAP) \
--source-map-url async.min.map \
-o $@
babel-minify $< --mangle -o $@

$(DIST)/async.js: $(UMD_BUNDLE)
cp $< $@

$(DIST)/async.min.js: $(UMD_BUNDLE_MIN)
cp $< $@

$(DIST)/async.min.map: $(UMD_BUNDLE_MIN)
cp $(UMD_BUNDLE_MAP) $@
# $(DIST)/async.min.map: $(UMD_BUNDLE_MIN)
# cp $(UMD_BUNDLE_MAP) $@

build-es: $(ES_MODULES)

Expand Down
2 changes: 1 addition & 1 deletion lib/applyEach.js
Expand Up @@ -15,7 +15,7 @@ import map from './map';
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Array|Iterable|Object} fns - A collection of {@link AsyncFunction}s
* @param {Array|Iterable|AsyncIterable|Object} fns - A collection of {@link AsyncFunction}s
* to all call with the same arguments
* @param {...*} [args] - any number of separate arguments to pass to the
* function.
Expand Down
2 changes: 1 addition & 1 deletion lib/applyEachSeries.js
Expand Up @@ -10,7 +10,7 @@ import mapSeries from './mapSeries';
* @method
* @see [async.applyEach]{@link module:ControlFlow.applyEach}
* @category Control Flow
* @param {Array|Iterable|Object} fns - A collection of {@link AsyncFunction}s to all
* @param {Array|Iterable|AsyncIterable|Object} fns - A collection of {@link AsyncFunction}s to all
* call with the same arguments
* @param {...*} [args] - any number of separate arguments to pass to the
* function.
Expand Down
2 changes: 1 addition & 1 deletion lib/concat.js
Expand Up @@ -12,7 +12,7 @@ import concatLimit from './concatLimit';
* @memberOf module:Collections
* @method
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`,
* which should use an array as its result. Invoked with (item, callback).
* @param {Function} [callback(err)] - A callback which is called after all the
Expand Down
2 changes: 1 addition & 1 deletion lib/concatLimit.js
Expand Up @@ -11,7 +11,7 @@ import mapLimit from './mapLimit';
* @method
* @see [async.concat]{@link module:Collections.concat}
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`,
* which should use an array as its result. Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/concatSeries.js
Expand Up @@ -10,7 +10,7 @@ import concatLimit from './concatLimit';
* @method
* @see [async.concat]{@link module:Collections.concat}
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`.
* The iteratee should complete with an array an array of results.
* Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/detect.js
Expand Up @@ -17,7 +17,7 @@ import doParallel from './internal/doParallel';
* @method
* @alias find
* @category Collections
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A truth test to apply to each item in `coll`.
* The iteratee must complete with a boolean value as its result.
* Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/detectLimit.js
Expand Up @@ -12,7 +12,7 @@ import doParallelLimit from './internal/doParallelLimit';
* @see [async.detect]{@link module:Collections.detect}
* @alias findLimit
* @category Collections
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - A truth test to apply to each item in `coll`.
* The iteratee must complete with a boolean value as its result.
Expand Down
2 changes: 1 addition & 1 deletion lib/detectSeries.js
Expand Up @@ -11,7 +11,7 @@ import doLimit from './internal/doLimit';
* @see [async.detect]{@link module:Collections.detect}
* @alias findSeries
* @category Collections
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A truth test to apply to each item in `coll`.
* The iteratee must complete with a boolean value as its result.
* Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/each.js
Expand Up @@ -18,7 +18,7 @@ import wrapAsync from './internal/wrapAsync'
* @method
* @alias forEach
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to
* each item in `coll`. Invoked with (item, callback).
* The array index is not passed to the iteratee.
Expand Down
2 changes: 1 addition & 1 deletion lib/eachLimit.js
Expand Up @@ -12,7 +12,7 @@ import wrapAsync from './internal/wrapAsync';
* @see [async.each]{@link module:Collections.each}
* @alias forEachLimit
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
Expand Down
2 changes: 1 addition & 1 deletion lib/eachOf.js
Expand Up @@ -49,7 +49,7 @@ var eachOfGeneric = doLimit(eachOfLimit, Infinity);
* @alias forEachOf
* @category Collection
* @see [async.each]{@link module:Collections.each}
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each
* item in `coll`.
* The `key` is the item's key, or index in the case of an array.
Expand Down
2 changes: 1 addition & 1 deletion lib/eachOfLimit.js
Expand Up @@ -12,7 +12,7 @@ import wrapAsync from './internal/wrapAsync';
* @see [async.eachOf]{@link module:Collections.eachOf}
* @alias forEachOfLimit
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each
* item in `coll`. The `key` is the item's key, or index in the case of an
Expand Down
2 changes: 1 addition & 1 deletion lib/eachOfSeries.js
Expand Up @@ -11,7 +11,7 @@ import doLimit from './internal/doLimit';
* @see [async.eachOf]{@link module:Collections.eachOf}
* @alias forEachOfSeries
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* Invoked with (item, key, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/eachSeries.js
Expand Up @@ -11,7 +11,7 @@ import doLimit from './internal/doLimit';
* @see [async.each]{@link module:Collections.each}
* @alias forEachSeries
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each
* item in `coll`.
* The array index is not passed to the iteratee.
Expand Down
2 changes: 1 addition & 1 deletion lib/every.js
Expand Up @@ -11,7 +11,7 @@ import doParallel from './internal/doParallel';
* @method
* @alias all
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collection in parallel.
* The iteratee must complete with a boolean result value.
Expand Down
2 changes: 1 addition & 1 deletion lib/everyLimit.js
Expand Up @@ -11,7 +11,7 @@ import doParallelLimit from './internal/doParallelLimit';
* @see [async.every]{@link module:Collections.every}
* @alias allLimit
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collection in parallel.
Expand Down
2 changes: 1 addition & 1 deletion lib/everySeries.js
Expand Up @@ -11,7 +11,7 @@ import doLimit from './internal/doLimit';
* @see [async.every]{@link module:Collections.every}
* @alias allSeries
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collection in series.
* The iteratee must complete with a boolean result value.
Expand Down
2 changes: 1 addition & 1 deletion lib/filter.js
Expand Up @@ -12,7 +12,7 @@ import doParallel from './internal/doParallel';
* @method
* @alias select
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {Function} iteratee - A truth test to apply to each item in `coll`.
* The `iteratee` is passed a `callback(err, truthValue)`, which must be called
* with a boolean argument once it has completed. Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/filterLimit.js
Expand Up @@ -12,7 +12,7 @@ import doParallelLimit from './internal/doParallelLimit';
* @see [async.filter]{@link module:Collections.filter}
* @alias selectLimit
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {Function} iteratee - A truth test to apply to each item in `coll`.
* The `iteratee` is passed a `callback(err, truthValue)`, which must be called
Expand Down
2 changes: 1 addition & 1 deletion lib/filterSeries.js
Expand Up @@ -11,7 +11,7 @@ import doLimit from './internal/doLimit';
* @see [async.filter]{@link module:Collections.filter}
* @alias selectSeries
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {Function} iteratee - A truth test to apply to each item in `coll`.
* The `iteratee` is passed a `callback(err, truthValue)`, which must be called
* with a boolean argument once it has completed. Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/groupBy.js
Expand Up @@ -17,7 +17,7 @@ import groupByLimit from './groupByLimit';
* @memberOf module:Collections
* @method
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The iteratee should complete with a `key` to group the value under.
Expand Down
2 changes: 1 addition & 1 deletion lib/groupByLimit.js
Expand Up @@ -10,7 +10,7 @@ import wrapAsync from './internal/wrapAsync';
* @method
* @see [async.groupBy]{@link module:Collections.groupBy}
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
Expand Down
2 changes: 1 addition & 1 deletion lib/groupBySeries.js
Expand Up @@ -10,7 +10,7 @@ import groupByLimit from './groupByLimit';
* @method
* @see [async.groupBy]{@link module:Collections.groupBy}
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
Expand Down
63 changes: 63 additions & 0 deletions lib/internal/asyncEachOfLimit.js
@@ -0,0 +1,63 @@
import breakLoop from './breakLoop';

// for async generators
export default function asyncEachOfLimit(generator, limit, iteratee, callback) {
let done = false
let canceled = false
let awaiting = false
let running = 0
let idx = 0

function replenish() {
//console.log('replenish')
if (running >= limit || awaiting || done) return
//console.log('replenish awaiting')
awaiting = true
generator.next().then(({value, done: iterDone}) => {
//console.log('got value', value)
if (canceled || done) return
awaiting = false
if (iterDone) {
done = true;
if (running <= 0) {
//console.log('done nextCb')
callback(null)
}
return;
}
running++
iteratee(value, idx, iterateeCallback)
idx++
replenish()
}).catch(handleError)
}

function iterateeCallback(err, result) {
//console.log('iterateeCallback')
running -= 1;
if (canceled) return
if (err) return handleError(err)

if (err === false) {
done = true;
canceled = true;
return
}

if (result === breakLoop || (done && running <= 0)) {
done = true;
//console.log('done iterCb')
return callback(null);
}
replenish()
}

function handleError(err) {
if (canceled) return
awaiting = false
done = true
callback(err)
}

replenish()
}
3 changes: 2 additions & 1 deletion lib/internal/breakLoop.js
@@ -1,3 +1,4 @@
// A temporary value used to identify if the loop should be broken.
// See #1064, #1293
export default {};
const breakLoop = {};
export default breakLoop;
8 changes: 8 additions & 0 deletions lib/internal/eachOfLimit.js
Expand Up @@ -3,6 +3,8 @@ import once from './once';

import iterator from './iterator';
import onlyOnce from './onlyOnce';
import {isAsyncGenerator, isAsyncIterable} from './wrapAsync'
import asyncEachOfLimit from './asyncEachOfLimit'

import breakLoop from './breakLoop';

Expand All @@ -15,6 +17,12 @@ export default (limit) => {
if (!obj) {
return callback(null);
}
if (isAsyncGenerator(obj)) {
return asyncEachOfLimit(obj, limit, iteratee, callback)
}
if (isAsyncIterable(obj)) {
return asyncEachOfLimit(obj[Symbol.asyncIterator](), limit, iteratee, callback)
}
var nextElem = iterator(obj);
var done = false;
var canceled = false;
Expand Down
10 changes: 9 additions & 1 deletion lib/internal/wrapAsync.js
Expand Up @@ -4,10 +4,18 @@ function isAsync(fn) {
return fn[Symbol.toStringTag] === 'AsyncFunction';
}

function isAsyncGenerator(fn) {
return fn[Symbol.toStringTag] === 'AsyncGenerator';
}

function isAsyncIterable(obj) {
return typeof obj[Symbol.asyncIterator] === 'function';
}

function wrapAsync(asyncFn) {
return isAsync(asyncFn) ? asyncify(asyncFn) : asyncFn;
}

export default wrapAsync;

export { isAsync };
export { isAsync, isAsyncGenerator, isAsyncIterable };
2 changes: 1 addition & 1 deletion lib/map.js
Expand Up @@ -23,7 +23,7 @@ import map from './internal/map';
* @memberOf module:Collections
* @method
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The iteratee should complete with the transformed item.
Expand Down

0 comments on commit 61268c5

Please sign in to comment.