Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial async generator support #1560

Merged
merged 8 commits into from Aug 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion .babelrc
@@ -1,5 +1,8 @@
{
"plugins": ["transform-es2015-modules-commonjs"],
"plugins": [
"transform-es2015-modules-commonjs",
"syntax-async-generators"
],
"env": {
"test": {
"plugins": ["istanbul"]
Expand Down
1 change: 1 addition & 0 deletions .eslintrc
Expand Up @@ -5,6 +5,7 @@
"mocha": true,
"es6": true
},
"parser": "babel-eslint",
"parserOptions": {
"ecmaVersion": 8,
"sourceType": "module"
Expand Down
14 changes: 5 additions & 9 deletions Makefile
Expand Up @@ -7,7 +7,6 @@ export PATH := ./node_modules/.bin/:$(PATH):./bin/

PACKAGE = asyncjs
REQUIRE_NAME = async
UGLIFY = uglifyjs
XYZ = support/xyz.sh --repo git@github.com:caolan/async.git

BUILDDIR = build
Expand All @@ -21,7 +20,7 @@ LINT_FILES := lib/ test/ $(shell find perf/ -maxdepth 2 -type f) $(shell find su

UMD_BUNDLE := $(BUILDDIR)/dist/async.js
UMD_BUNDLE_MIN := $(BUILDDIR)/dist/async.min.js
UMD_BUNDLE_MAP := $(BUILDDIR)/dist/async.min.map
# UMD_BUNDLE_MAP := $(BUILDDIR)/dist/async.min.map
ALIAS_ES := $(addprefix build-es/, $(addsuffix .js, $(shell cat $(SCRIPTS)/aliases.txt | cut -d ' ' -f1)))
ALIAS_CJS := $(patsubst build-es/%, build/%, $(ALIAS_ES))
ES_MODULES := $(patsubst lib/%.js, build-es/%.js, $(JS_SRC)) $(ALIAS_ES)
Expand Down Expand Up @@ -81,26 +80,23 @@ $(UMD_BUNDLE): $(ES_MODULES) package.json
node $(SCRIPTS)/build/aggregate-bundle.js

# Create the minified UMD versions and copy them to dist/ for bower
build-dist: $(DIST) $(DIST)/async.js $(DIST)/async.min.js $(DIST)/async.min.map
build-dist: $(DIST) $(DIST)/async.js $(DIST)/async.min.js # $(DIST)/async.min.map

$(DIST):
mkdir -p $@

$(UMD_BUNDLE_MIN): $(UMD_BUNDLE)
mkdir -p "$(@D)"
$(UGLIFY) $< --mangle --compress \
--source-map $(UMD_BUNDLE_MAP) \
--source-map-url async.min.map \
-o $@
babel-minify $< --mangle -o $@

$(DIST)/async.js: $(UMD_BUNDLE)
cp $< $@

$(DIST)/async.min.js: $(UMD_BUNDLE_MIN)
cp $< $@

$(DIST)/async.min.map: $(UMD_BUNDLE_MIN)
cp $(UMD_BUNDLE_MAP) $@
# $(DIST)/async.min.map: $(UMD_BUNDLE_MIN)
# cp $(UMD_BUNDLE_MAP) $@

build-es: $(ES_MODULES)

Expand Down
2 changes: 1 addition & 1 deletion lib/applyEach.js
Expand Up @@ -15,7 +15,7 @@ import map from './map';
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Array|Iterable|Object} fns - A collection of {@link AsyncFunction}s
* @param {Array|Iterable|AsyncIterable|Object} fns - A collection of {@link AsyncFunction}s
* to all call with the same arguments
* @param {...*} [args] - any number of separate arguments to pass to the
* function.
Expand Down
2 changes: 1 addition & 1 deletion lib/applyEachSeries.js
Expand Up @@ -10,7 +10,7 @@ import mapSeries from './mapSeries';
* @method
* @see [async.applyEach]{@link module:ControlFlow.applyEach}
* @category Control Flow
* @param {Array|Iterable|Object} fns - A collection of {@link AsyncFunction}s to all
* @param {Array|Iterable|AsyncIterable|Object} fns - A collection of {@link AsyncFunction}s to all
* call with the same arguments
* @param {...*} [args] - any number of separate arguments to pass to the
* function.
Expand Down
2 changes: 1 addition & 1 deletion lib/concat.js
Expand Up @@ -13,7 +13,7 @@ import concatLimit from './concatLimit';
* @memberOf module:Collections
* @method
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`,
* which should use an array as its result. Invoked with (item, callback).
* @param {Function} [callback(err)] - A callback which is called after all the
Expand Down
2 changes: 1 addition & 1 deletion lib/concatLimit.js
Expand Up @@ -11,7 +11,7 @@ import mapLimit from './mapLimit';
* @method
* @see [async.concat]{@link module:Collections.concat}
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`,
* which should use an array as its result. Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/concatSeries.js
Expand Up @@ -10,7 +10,7 @@ import concatLimit from './concatLimit';
* @method
* @see [async.concat]{@link module:Collections.concat}
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`.
* The iteratee should complete with an array an array of results.
* Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/detect.js
Expand Up @@ -17,7 +17,7 @@ import doParallel from './internal/doParallel';
* @method
* @alias find
* @category Collections
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A truth test to apply to each item in `coll`.
* The iteratee must complete with a boolean value as its result.
* Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/detectLimit.js
Expand Up @@ -12,7 +12,7 @@ import doParallelLimit from './internal/doParallelLimit';
* @see [async.detect]{@link module:Collections.detect}
* @alias findLimit
* @category Collections
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - A truth test to apply to each item in `coll`.
* The iteratee must complete with a boolean value as its result.
Expand Down
2 changes: 1 addition & 1 deletion lib/detectSeries.js
Expand Up @@ -11,7 +11,7 @@ import doLimit from './internal/doLimit';
* @see [async.detect]{@link module:Collections.detect}
* @alias findSeries
* @category Collections
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A truth test to apply to each item in `coll`.
* The iteratee must complete with a boolean value as its result.
* Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/each.js
Expand Up @@ -18,7 +18,7 @@ import wrapAsync from './internal/wrapAsync'
* @method
* @alias forEach
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to
* each item in `coll`. Invoked with (item, callback).
* The array index is not passed to the iteratee.
Expand Down
2 changes: 1 addition & 1 deletion lib/eachLimit.js
Expand Up @@ -12,7 +12,7 @@ import wrapAsync from './internal/wrapAsync';
* @see [async.each]{@link module:Collections.each}
* @alias forEachLimit
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
Expand Down
2 changes: 1 addition & 1 deletion lib/eachOf.js
Expand Up @@ -49,7 +49,7 @@ var eachOfGeneric = doLimit(eachOfLimit, Infinity);
* @alias forEachOf
* @category Collection
* @see [async.each]{@link module:Collections.each}
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each
* item in `coll`.
* The `key` is the item's key, or index in the case of an array.
Expand Down
2 changes: 1 addition & 1 deletion lib/eachOfLimit.js
Expand Up @@ -12,7 +12,7 @@ import wrapAsync from './internal/wrapAsync';
* @see [async.eachOf]{@link module:Collections.eachOf}
* @alias forEachOfLimit
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each
* item in `coll`. The `key` is the item's key, or index in the case of an
Expand Down
2 changes: 1 addition & 1 deletion lib/eachOfSeries.js
Expand Up @@ -11,7 +11,7 @@ import doLimit from './internal/doLimit';
* @see [async.eachOf]{@link module:Collections.eachOf}
* @alias forEachOfSeries
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* Invoked with (item, key, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/eachSeries.js
Expand Up @@ -11,7 +11,7 @@ import doLimit from './internal/doLimit';
* @see [async.each]{@link module:Collections.each}
* @alias forEachSeries
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each
* item in `coll`.
* The array index is not passed to the iteratee.
Expand Down
2 changes: 1 addition & 1 deletion lib/every.js
Expand Up @@ -11,7 +11,7 @@ import doParallel from './internal/doParallel';
* @method
* @alias all
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collection in parallel.
* The iteratee must complete with a boolean result value.
Expand Down
2 changes: 1 addition & 1 deletion lib/everyLimit.js
Expand Up @@ -11,7 +11,7 @@ import doParallelLimit from './internal/doParallelLimit';
* @see [async.every]{@link module:Collections.every}
* @alias allLimit
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collection in parallel.
Expand Down
2 changes: 1 addition & 1 deletion lib/everySeries.js
Expand Up @@ -11,7 +11,7 @@ import doLimit from './internal/doLimit';
* @see [async.every]{@link module:Collections.every}
* @alias allSeries
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async truth test to apply to each item
* in the collection in series.
* The iteratee must complete with a boolean result value.
Expand Down
2 changes: 1 addition & 1 deletion lib/filter.js
Expand Up @@ -12,7 +12,7 @@ import doParallel from './internal/doParallel';
* @method
* @alias select
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {Function} iteratee - A truth test to apply to each item in `coll`.
* The `iteratee` is passed a `callback(err, truthValue)`, which must be called
* with a boolean argument once it has completed. Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/filterLimit.js
Expand Up @@ -12,7 +12,7 @@ import doParallelLimit from './internal/doParallelLimit';
* @see [async.filter]{@link module:Collections.filter}
* @alias selectLimit
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {Function} iteratee - A truth test to apply to each item in `coll`.
* The `iteratee` is passed a `callback(err, truthValue)`, which must be called
Expand Down
2 changes: 1 addition & 1 deletion lib/filterSeries.js
Expand Up @@ -11,7 +11,7 @@ import doLimit from './internal/doLimit';
* @see [async.filter]{@link module:Collections.filter}
* @alias selectSeries
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {Function} iteratee - A truth test to apply to each item in `coll`.
* The `iteratee` is passed a `callback(err, truthValue)`, which must be called
* with a boolean argument once it has completed. Invoked with (item, callback).
Expand Down
2 changes: 1 addition & 1 deletion lib/groupBy.js
Expand Up @@ -17,7 +17,7 @@ import groupByLimit from './groupByLimit';
* @memberOf module:Collections
* @method
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The iteratee should complete with a `key` to group the value under.
Expand Down
2 changes: 1 addition & 1 deletion lib/groupByLimit.js
Expand Up @@ -10,7 +10,7 @@ import wrapAsync from './internal/wrapAsync';
* @method
* @see [async.groupBy]{@link module:Collections.groupBy}
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
Expand Down
2 changes: 1 addition & 1 deletion lib/groupBySeries.js
Expand Up @@ -10,7 +10,7 @@ import groupByLimit from './groupByLimit';
* @method
* @see [async.groupBy]{@link module:Collections.groupBy}
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
Expand Down
63 changes: 63 additions & 0 deletions lib/internal/asyncEachOfLimit.js
@@ -0,0 +1,63 @@
import breakLoop from './breakLoop';

// for async generators
export default function asyncEachOfLimit(generator, limit, iteratee, callback) {
let done = false
let canceled = false
let awaiting = false
let running = 0
let idx = 0

function replenish() {
//console.log('replenish')
if (running >= limit || awaiting || done) return
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I experimented with calling replenish() multiple times, awaiting mutiple .next()s at the same time, but there's some grey area in how it could work. Since you don't know the length of the iterator up front, you have to set some arbitrary limits in the pure parallel case, e.g. 10. Whenever you call .next() N times synchronously, you end up calling next() and receiving {done: true} N times spuriously at the end. We don't need to await multiple .next()s at the same time, because as far as I know, there will be a strict linear sequence of yields in the generator, meaning it's pointless to await more than one. It leaves the job of backpressure to the generator implementation too.

//console.log('replenish awaiting')
awaiting = true
generator.next().then(({value, done: iterDone}) => {
//console.log('got value', value)
if (canceled || done) return
awaiting = false
if (iterDone) {
done = true;
if (running <= 0) {
//console.log('done nextCb')
callback(null)
}
return;
}
running++
iteratee(value, idx, iterateeCallback)
idx++
replenish()
}).catch(handleError)
}

function iterateeCallback(err, result) {
//console.log('iterateeCallback')
running -= 1;
if (canceled) return
if (err) return handleError(err)

if (err === false) {
done = true;
canceled = true;
return
}

if (result === breakLoop || (done && running <= 0)) {
done = true;
//console.log('done iterCb')
return callback(null);
}
replenish()
}

function handleError(err) {

This comment was marked as resolved.

if (canceled) return
awaiting = false
done = true
callback(err)
}

replenish()
}
3 changes: 2 additions & 1 deletion lib/internal/breakLoop.js
@@ -1,3 +1,4 @@
// A temporary value used to identify if the loop should be broken.
// See #1064, #1293
export default {};
const breakLoop = {};
export default breakLoop;
8 changes: 8 additions & 0 deletions lib/internal/eachOfLimit.js
Expand Up @@ -3,6 +3,8 @@ import once from './once';

import iterator from './iterator';
import onlyOnce from './onlyOnce';
import {isAsyncGenerator, isAsyncIterable} from './wrapAsync'
import asyncEachOfLimit from './asyncEachOfLimit'

import breakLoop from './breakLoop';

Expand All @@ -15,6 +17,12 @@ export default (limit) => {
if (!obj) {
return callback(null);
}
if (isAsyncGenerator(obj)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor thing, but should this be expanded to support async iterators in general? Something similar to how we currently have getIterator except with Symbol.asyncIterator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point, didnt know about that detail of the spec.

return asyncEachOfLimit(obj, limit, iteratee, callback)
}
if (isAsyncIterable(obj)) {
return asyncEachOfLimit(obj[Symbol.asyncIterator](), limit, iteratee, callback)
}
var nextElem = iterator(obj);
var done = false;
var canceled = false;
Expand Down
10 changes: 9 additions & 1 deletion lib/internal/wrapAsync.js
Expand Up @@ -4,10 +4,18 @@ function isAsync(fn) {
return fn[Symbol.toStringTag] === 'AsyncFunction';
}

function isAsyncGenerator(fn) {
return fn[Symbol.toStringTag] === 'AsyncGenerator';
}

function isAsyncIterable(obj) {
return typeof obj[Symbol.asyncIterator] === 'function';
}

function wrapAsync(asyncFn) {
return isAsync(asyncFn) ? asyncify(asyncFn) : asyncFn;
}

export default wrapAsync;

export { isAsync };
export { isAsync, isAsyncGenerator, isAsyncIterable };
2 changes: 1 addition & 1 deletion lib/map.js
Expand Up @@ -23,7 +23,7 @@ import map from './internal/map';
* @memberOf module:Collections
* @method
* @category Collection
* @param {Array|Iterable|Object} coll - A collection to iterate over.
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - An async function to apply to each item in
* `coll`.
* The iteratee should complete with the transformed item.
Expand Down