New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: initial async generator support #1560
Changes from all commits
8567a7f
aa1ec94
82cef4f
59fea53
2cdaa45
e491bd1
0b5c27e
c99905a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import breakLoop from './breakLoop'; | ||
|
||
// for async generators | ||
export default function asyncEachOfLimit(generator, limit, iteratee, callback) { | ||
let done = false | ||
let canceled = false | ||
let awaiting = false | ||
let running = 0 | ||
let idx = 0 | ||
|
||
function replenish() { | ||
//console.log('replenish') | ||
if (running >= limit || awaiting || done) return | ||
//console.log('replenish awaiting') | ||
awaiting = true | ||
generator.next().then(({value, done: iterDone}) => { | ||
//console.log('got value', value) | ||
if (canceled || done) return | ||
awaiting = false | ||
if (iterDone) { | ||
done = true; | ||
if (running <= 0) { | ||
//console.log('done nextCb') | ||
callback(null) | ||
} | ||
return; | ||
} | ||
running++ | ||
iteratee(value, idx, iterateeCallback) | ||
idx++ | ||
replenish() | ||
}).catch(handleError) | ||
} | ||
|
||
function iterateeCallback(err, result) { | ||
//console.log('iterateeCallback') | ||
running -= 1; | ||
if (canceled) return | ||
if (err) return handleError(err) | ||
|
||
if (err === false) { | ||
done = true; | ||
canceled = true; | ||
return | ||
} | ||
|
||
if (result === breakLoop || (done && running <= 0)) { | ||
done = true; | ||
//console.log('done iterCb') | ||
return callback(null); | ||
} | ||
replenish() | ||
} | ||
|
||
function handleError(err) { | ||
This comment was marked as resolved.
Sorry, something went wrong. |
||
if (canceled) return | ||
awaiting = false | ||
done = true | ||
callback(err) | ||
} | ||
|
||
replenish() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
// A temporary value used to identify if the loop should be broken. | ||
// See #1064, #1293 | ||
export default {}; | ||
const breakLoop = {}; | ||
export default breakLoop; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,8 @@ import once from './once'; | |
|
||
import iterator from './iterator'; | ||
import onlyOnce from './onlyOnce'; | ||
import {isAsyncGenerator, isAsyncIterable} from './wrapAsync' | ||
import asyncEachOfLimit from './asyncEachOfLimit' | ||
|
||
import breakLoop from './breakLoop'; | ||
|
||
|
@@ -15,6 +17,12 @@ export default (limit) => { | |
if (!obj) { | ||
return callback(null); | ||
} | ||
if (isAsyncGenerator(obj)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a minor thing, but should this be expanded to support async iterators in general? Something similar to how we currently have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, good point, didnt know about that detail of the spec. |
||
return asyncEachOfLimit(obj, limit, iteratee, callback) | ||
} | ||
if (isAsyncIterable(obj)) { | ||
return asyncEachOfLimit(obj[Symbol.asyncIterator](), limit, iteratee, callback) | ||
} | ||
var nextElem = iterator(obj); | ||
var done = false; | ||
var canceled = false; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I experimented with calling
replenish()
multiple times, awaiting mutiple.next()
s at the same time, but there's some grey area in how it could work. Since you don't know the length of the iterator up front, you have to set some arbitrary limits in the pure parallel case, e.g. 10. Whenever you call.next()
N times synchronously, you end up calling next() and receiving{done: true}
N times spuriously at the end. We don't need to await multiple.next()
s at the same time, because as far as I know, there will be a strict linear sequence ofyield
s in the generator, meaning it's pointless to await more than one. It leaves the job of backpressure to the generator implementation too.