Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial async generator support #1560

Merged
merged 8 commits into from Aug 5, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion .babelrc
@@ -1,5 +1,8 @@
{
"plugins": ["transform-es2015-modules-commonjs"],
"plugins": [
"transform-es2015-modules-commonjs",
"syntax-async-generators"
],
"env": {
"test": {
"plugins": ["istanbul"]
Expand Down
59 changes: 59 additions & 0 deletions lib/internal/asyncEachOfLimit.js
@@ -0,0 +1,59 @@
const breakLoop = require('./breakLoop')

// for async generators
export default function asyncEachOfLimit(generator, limit, iteratee, callback) {
let done = false
let canceled = false
let awaiting = false
let running = 0
let idx = 0

function replenish() {
//console.log('replenish')
if (running >= limit || awaiting) return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you end up with more than one generator running asynchronously with this code? If I understand the code correctly, replenish only gets called

  1. to spawn the generator when asyncEachOfLimit is called
  2. whenever a generator completes
  3. after iterateeCallback is called

Do we not need to loop limit - running inside of replenish to call several generators at once?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd only end up with more than one iteratee running if your iteratee takes longer to process than your generator takes to generate. For example, in the tests, I have the generator ticking every 1 ms, but have the iteratee take 5 ms.

Async generators are like streams -- you get a linear sequence of items. You can't await multiple items at the same time (and if you did, you'd get the same item multiple times, as you do when you call .then() on the same promise multiple times). This is why we can't loop in replenish -- we'd end up calling the iteratee with the same item multiple times.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see thanks for explaining, I haven't used generators before.

So for my understanding, the replenish inside of the generator's then function is to spawn the asynchronous processes up until limit are running and the replenish inside of the iterateeCallback is to trigger additional items to run if the limit was reached?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't await multiple items at the same time (and if you did, you'd get the same item multiple times...)

Are you sure of this, as is right now we have the possibility of calling next multiple times before the promise resolves if iteratee is synchronous. In such a case will things break?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why we can't loop in replenish -- we'd end up calling the iteratee with the same item multiple times.

Sorry, I may be misunderstanding something, but I thought calls to next are queued and resolved separately. I'm looking at the Async iterators and async iterables section of the proposal.

That being said, I think we should still probably avoid looping in replenish as we still have to wait for earlier promises to resolve.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right, I was confusing .next() with .then(). We can call .next() as many times as we need to, perhaps up to the concurrency limit.

//console.log('replenish awaiting')
awaiting = true
generator.next().then(({value, done: iterDone}) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you have a generator that has no items?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind dumb question, I see how this works

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to what @megawac was saying, I think we need a if (done) return; line at the very top of this callback. If one of the previous iteratees cancelled or errored, we shouldn't be invoking the iteratee with successive items.

For example, if the first iteratee starts and we start waiting for the second item. If the first iteratee cancels before the then callback is called with the second item, when it is called, we would still invoke the iteratee with the second item.

//console.log('got value', value)
awaiting = false
if (iterDone) {
done = true;
if (running <= 0) {
callback(null)
}
return;
}
running++
iteratee(value, idx, iterateeCallback)
idx++
replenish()
}).catch(handleError)
}

function iterateeCallback(err, result) {
//console.log('iterateeCallback')
if (canceled) return
running -= 1;
if (err) return handleError(err)

if (err === false) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid calling replenish int his case and go straight to calling the complete callback?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I should return in this block.

done = true;
canceled = true;
}

if (result === breakLoop || (done && running <= 0)) {
done = true;
//console.log('done')
return callback(null);
}
replenish()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you have to avoid calling replenish when done === true or have replenish check if done === true.

Lets say hypothetically we have a limit of 2 and the final 2 iteratees are running. When one of these iteratees resolves, running = 1 and done = true but we'll still replenish and call the generator.next().then(). This has the possibility of entering a race condition, if the other iteratee resolves in the time between the then callback is called, you can actually end up calling callback twice

}

function handleError(err) {

This comment was marked as resolved.

awaiting = false
done = true
callback(err)
}

replenish()
}
5 changes: 5 additions & 0 deletions lib/internal/eachOfLimit.js
Expand Up @@ -3,6 +3,8 @@ import once from './once';

import iterator from './iterator';
import onlyOnce from './onlyOnce';
import {isAsyncGenerator} from './wrapAsync'
import asyncEachOfLimit from './asyncEachOfLimit'

import breakLoop from './breakLoop';

Expand All @@ -15,6 +17,9 @@ export default (limit) => {
if (!obj) {
return callback(null);
}
if (isAsyncGenerator(obj)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor thing, but should this be expanded to support async iterators in general? Something similar to how we currently have getIterator except with Symbol.asyncIterator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point, didnt know about that detail of the spec.

return asyncEachOfLimit(obj, limit, iteratee, callback)
}
var nextElem = iterator(obj);
var done = false;
var canceled = false;
Expand Down
6 changes: 5 additions & 1 deletion lib/internal/wrapAsync.js
Expand Up @@ -4,10 +4,14 @@ function isAsync(fn) {
return fn[Symbol.toStringTag] === 'AsyncFunction';
}

function isAsyncGenerator(fn) {
return fn[Symbol.toStringTag] === 'AsyncGenerator';
}

function wrapAsync(asyncFn) {
return isAsync(asyncFn) ? asyncify(asyncFn) : asyncFn;
}

export default wrapAsync;

export { isAsync };
export { isAsync, isAsyncGenerator };
20 changes: 16 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Expand Up @@ -23,6 +23,7 @@
"babel-core": "^6.26.3",
"babel-plugin-add-module-exports": "^0.2.1",
"babel-plugin-istanbul": "^2.0.1",
"babel-plugin-syntax-async-generators": "^6.13.0",
"babel-plugin-transform-es2015-modules-commonjs": "^6.26.2",
"babel-preset-es2015": "^6.3.13",
"babel-preset-es2017": "^6.22.0",
Expand Down
17 changes: 17 additions & 0 deletions test/asyncFunctions.js
Expand Up @@ -11,6 +11,17 @@ function supportsAsync() {
return supported;
}

function supportsAsyncGenerators() {
var supported;
try {
/* eslint no-eval: 0 */
supported = eval('(async function * () { yield await 1 })');
} catch (e) {
supported = false;
}
return supported;
}

describe('async function support', function () {
this.timeout(100);

Expand All @@ -19,4 +30,10 @@ describe('async function support', function () {
} else {
it('should not test async functions in this environment');
}

if (supportsAsyncGenerators()) {
require('./es2017/asyncGenerators.js')();
} else {
it('should not test async generators in this environment');
}
});
61 changes: 61 additions & 0 deletions test/es2017/asyncGenerators.js
@@ -0,0 +1,61 @@
var async = require('../../lib');
const {expect} = require('chai');
const assert = require('assert');

const delay = ms => new Promise(resolve => setTimeout(resolve, ms))

module.exports = function () {
async function asyncIdentity(val) {
var res = await Promise.resolve(val);
return res;
}

async function * range (num) {
for(let i = 0; i < num; i++) {
await delay(1)
yield i
}
}

it('should handle async generators in each', (done) => {
const calls = []
async.each(range(5),
async (val) => {
calls.push(val)
await delay(5)
}, (err) => {
if (err) throw err
expect(calls).to.eql([0, 1, 2, 3, 4])
done()
}
)
});

it('should handle async generators in eachLimit', (done) => {
const calls = []
async.eachLimit(range(5), 2,
async (val) => {
calls.push(val)
await delay(5)
}, (err) => {
if (err) throw err
expect(calls).to.eql([0, 1, 2, 3, 4])
done()
}
)
});

it('should handle async generators in eachSeries', (done) => {
const calls = []
async.eachSeries(range(5),
async (val) => {
calls.push(val)
await delay(5)
}, (err) => {
if (err) throw err
expect(calls).to.eql([0, 1, 2, 3, 4])
done()
}
)
});
}