New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: initial async generator support #1560
Changes from 2 commits
8567a7f
aa1ec94
82cef4f
59fea53
2cdaa45
e491bd1
0b5c27e
c99905a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
const breakLoop = require('./breakLoop') | ||
|
||
// for async generators | ||
export default function asyncEachOfLimit(generator, limit, iteratee, callback) { | ||
let done = false | ||
let canceled = false | ||
let awaiting = false | ||
let running = 0 | ||
let idx = 0 | ||
|
||
function replenish() { | ||
//console.log('replenish') | ||
if (running >= limit || awaiting || done) return | ||
//console.log('replenish awaiting') | ||
awaiting = true | ||
generator.next().then(({value, done: iterDone}) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you have a generator that has no items? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nevermind dumb question, I see how this works There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to what @megawac was saying, I think we need a For example, if the first |
||
//console.log('got value', value) | ||
if (canceled || done) return | ||
awaiting = false | ||
if (iterDone) { | ||
done = true; | ||
if (running <= 0) { | ||
//console.log('done nextCb') | ||
callback(null) | ||
} | ||
return; | ||
} | ||
running++ | ||
iteratee(value, idx, iterateeCallback) | ||
idx++ | ||
replenish() | ||
}).catch(handleError) | ||
} | ||
|
||
function iterateeCallback(err, result) { | ||
//console.log('iterateeCallback') | ||
running -= 1; | ||
if (canceled) return | ||
if (err) return handleError(err) | ||
|
||
if (err === false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we avoid calling replenish int his case and go straight to calling the complete callback? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, I should |
||
done = true; | ||
canceled = true; | ||
return | ||
} | ||
|
||
if (result === breakLoop || (done && running <= 0)) { | ||
done = true; | ||
//console.log('done iterCb') | ||
return callback(null); | ||
} | ||
replenish() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you have to avoid calling Lets say hypothetically we have a limit of 2 and the final 2 |
||
} | ||
|
||
function handleError(err) { | ||
This comment was marked as resolved.
Sorry, something went wrong. |
||
if (canceled) return | ||
awaiting = false | ||
done = true | ||
callback(err) | ||
} | ||
|
||
replenish() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,8 @@ import once from './once'; | |
|
||
import iterator from './iterator'; | ||
import onlyOnce from './onlyOnce'; | ||
import {isAsyncGenerator} from './wrapAsync' | ||
import asyncEachOfLimit from './asyncEachOfLimit' | ||
|
||
import breakLoop from './breakLoop'; | ||
|
||
|
@@ -15,6 +17,9 @@ export default (limit) => { | |
if (!obj) { | ||
return callback(null); | ||
} | ||
if (isAsyncGenerator(obj)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a minor thing, but should this be expanded to support async iterators in general? Something similar to how we currently have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, good point, didnt know about that detail of the spec. |
||
return asyncEachOfLimit(obj, limit, iteratee, callback) | ||
} | ||
var nextElem = iterator(obj); | ||
var done = false; | ||
var canceled = false; | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
var async = require('../../lib'); | ||
const {expect} = require('chai'); | ||
const assert = require('assert'); | ||
|
||
const delay = ms => new Promise(resolve => setTimeout(resolve, ms)) | ||
|
||
module.exports = function () { | ||
async function asyncIdentity(val) { | ||
var res = await Promise.resolve(val); | ||
return res; | ||
} | ||
|
||
async function * range (num) { | ||
for(let i = 0; i < num; i++) { | ||
await delay(1) | ||
yield i | ||
} | ||
} | ||
|
||
it('should handle async generators in each', (done) => { | ||
const calls = [] | ||
async.each(range(5), | ||
async (val) => { | ||
calls.push(val) | ||
await delay(5) | ||
}, (err) => { | ||
if (err) throw err | ||
expect(calls).to.eql([0, 1, 2, 3, 4]) | ||
done() | ||
} | ||
) | ||
}); | ||
|
||
it('should handle async generators in eachLimit', (done) => { | ||
const calls = [] | ||
async.eachLimit(range(5), 2, | ||
async (val) => { | ||
calls.push(val) | ||
await delay(5) | ||
}, (err) => { | ||
if (err) throw err | ||
expect(calls).to.eql([0, 1, 2, 3, 4]) | ||
done() | ||
} | ||
) | ||
}); | ||
|
||
it('should handle async generators in eachSeries', (done) => { | ||
const calls = [] | ||
async.eachSeries(range(5), | ||
async (val) => { | ||
calls.push(val) | ||
await delay(5) | ||
}, (err) => { | ||
if (err) throw err | ||
expect(calls).to.eql([0, 1, 2, 3, 4]) | ||
done() | ||
} | ||
) | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I experimented with calling
replenish()
multiple times, awaiting mutiple.next()
s at the same time, but there's some grey area in how it could work. Since you don't know the length of the iterator up front, you have to set some arbitrary limits in the pure parallel case, e.g. 10. Whenever you call.next()
N times synchronously, you end up calling next() and receiving{done: true}
N times spuriously at the end. We don't need to await multiple.next()
s at the same time, because as far as I know, there will be a strict linear sequence ofyield
s in the generator, meaning it's pointless to await more than one. It leaves the job of backpressure to the generator implementation too.