New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: initial async generator support #1560
Changes from 1 commit
8567a7f
aa1ec94
82cef4f
59fea53
2cdaa45
e491bd1
0b5c27e
c99905a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
const breakLoop = require('./breakLoop') | ||
|
||
// for async generators | ||
export default function asyncEachOfLimit(generator, limit, iteratee, callback) { | ||
let done = false | ||
let canceled = false | ||
let awaiting = false | ||
let running = 0 | ||
let idx = 0 | ||
|
||
function replenish() { | ||
//console.log('replenish') | ||
if (running >= limit || awaiting) return | ||
//console.log('replenish awaiting') | ||
awaiting = true | ||
generator.next().then(({value, done: iterDone}) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you have a generator that has no items? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nevermind dumb question, I see how this works There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to what @megawac was saying, I think we need a For example, if the first |
||
//console.log('got value', value) | ||
awaiting = false | ||
if (iterDone) { | ||
done = true; | ||
if (running <= 0) { | ||
callback(null) | ||
} | ||
return; | ||
} | ||
running++ | ||
iteratee(value, idx, iterateeCallback) | ||
idx++ | ||
replenish() | ||
}).catch(handleError) | ||
} | ||
|
||
function iterateeCallback(err, result) { | ||
//console.log('iterateeCallback') | ||
if (canceled) return | ||
running -= 1; | ||
if (err) return handleError(err) | ||
|
||
if (err === false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we avoid calling replenish int his case and go straight to calling the complete callback? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, I should |
||
done = true; | ||
canceled = true; | ||
} | ||
|
||
if (result === breakLoop || (done && running <= 0)) { | ||
done = true; | ||
//console.log('done') | ||
return callback(null); | ||
} | ||
replenish() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you have to avoid calling Lets say hypothetically we have a limit of 2 and the final 2 |
||
} | ||
|
||
function handleError(err) { | ||
This comment was marked as resolved.
Sorry, something went wrong. |
||
awaiting = false | ||
done = true | ||
callback(err) | ||
} | ||
|
||
replenish() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,8 @@ import once from './once'; | |
|
||
import iterator from './iterator'; | ||
import onlyOnce from './onlyOnce'; | ||
import {isAsyncGenerator} from './wrapAsync' | ||
import asyncEachOfLimit from './asyncEachOfLimit' | ||
|
||
import breakLoop from './breakLoop'; | ||
|
||
|
@@ -15,6 +17,9 @@ export default (limit) => { | |
if (!obj) { | ||
return callback(null); | ||
} | ||
if (isAsyncGenerator(obj)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a minor thing, but should this be expanded to support async iterators in general? Something similar to how we currently have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, good point, didnt know about that detail of the spec. |
||
return asyncEachOfLimit(obj, limit, iteratee, callback) | ||
} | ||
var nextElem = iterator(obj); | ||
var done = false; | ||
var canceled = false; | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
var async = require('../../lib'); | ||
const {expect} = require('chai'); | ||
const assert = require('assert'); | ||
|
||
const delay = ms => new Promise(resolve => setTimeout(resolve, ms)) | ||
|
||
module.exports = function () { | ||
async function asyncIdentity(val) { | ||
var res = await Promise.resolve(val); | ||
return res; | ||
} | ||
|
||
async function * range (num) { | ||
for(let i = 0; i < num; i++) { | ||
await delay(1) | ||
yield i | ||
} | ||
} | ||
|
||
it('should handle async generators in each', (done) => { | ||
const calls = [] | ||
async.each(range(5), | ||
async (val) => { | ||
calls.push(val) | ||
await delay(5) | ||
}, (err) => { | ||
if (err) throw err | ||
expect(calls).to.eql([0, 1, 2, 3, 4]) | ||
done() | ||
} | ||
) | ||
}); | ||
|
||
it('should handle async generators in eachLimit', (done) => { | ||
const calls = [] | ||
async.eachLimit(range(5), 2, | ||
async (val) => { | ||
calls.push(val) | ||
await delay(5) | ||
}, (err) => { | ||
if (err) throw err | ||
expect(calls).to.eql([0, 1, 2, 3, 4]) | ||
done() | ||
} | ||
) | ||
}); | ||
|
||
it('should handle async generators in eachSeries', (done) => { | ||
const calls = [] | ||
async.eachSeries(range(5), | ||
async (val) => { | ||
calls.push(val) | ||
await delay(5) | ||
}, (err) => { | ||
if (err) throw err | ||
expect(calls).to.eql([0, 1, 2, 3, 4]) | ||
done() | ||
} | ||
) | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you end up with more than one generator running asynchronously with this code? If I understand the code correctly,
replenish
only gets calledasyncEachOfLimit
is callediterateeCallback
is calledDo we not need to loop
limit - running
inside of replenish to call several generators at once?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd only end up with more than one iteratee running if your iteratee takes longer to process than your generator takes to generate. For example, in the tests, I have the generator ticking every 1 ms, but have the iteratee take 5 ms.
Async generators are like streams -- you get a linear sequence of items. You can't await multiple items at the same time (and if you did, you'd get the same item multiple times, as you do when you call
.then()
on the same promise multiple times). This is why we can't loop inreplenish
-- we'd end up calling the iteratee with the same item multiple times.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see thanks for explaining, I haven't used generators before.
So for my understanding, the replenish inside of the generator's
then
function is to spawn the asynchronous processes up untillimit
are running and thereplenish
inside of theiterateeCallback
is to trigger additional items to run if the limit was reached?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure of this, as is right now we have the possibility of calling
next
multiple times before the promise resolves ifiteratee
is synchronous. In such a case will things break?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I may be misunderstanding something, but I thought calls to
next
are queued and resolved separately. I'm looking at theAsync iterators and async iterables
section of the proposal.That being said, I think we should still probably avoid looping in replenish as we still have to wait for earlier promises to resolve.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're right, I was confusing
.next()
with.then()
. We can call.next()
as many times as we need to, perhaps up to the concurrency limit.