-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelization is impossible #11
Comments
Well, the current plan is to buffer results mid-pipeline. But since it is up to a specified bound, rather than buffering indefinitely, this does not suffer from the backpressure problem - the program still will only request results from the producer as fast as the consumer is capable of handling them. This is, incidentally, quite similar to one of the approaches Rust takes (a Stream of Futures in Rust is conceptually similar to an async iterator in JS - in particular, in Rust, a Future does not start doing work until you actually So it seems possible to me. Indeed we have a concrete design with a solid consistency property. Can you say more about what specifically seems impossible? Or is the |
@bakkot As you mention, creating backpressure works fine, but since it cannot allow values to be created faster than they can be consumed backpressure is not parallelization. People coming here want parallelization, and they won't get it. |
I don't really know what that means. Like: let pages = await urls
.toAsync()
.map(x => fetch(x, opts))
.filter(asyncPredicate);
.bufferAhead(4)
.toArray(); This will do up to 4 fetches in parallel, and can start running the predicate while the later fetches are still running. That is what I, at least, want. How is that not parallelization? (Or at least concurrency, if we're being precise.) |
@bakkot That's exactly where I tripped up when I first tried to build that. I thought that writing what you just wrote would provide parallelization, but it you look closely you will find that it does not. Your problem is there's no such thing as an async iterable of promises, so |
You can have either parallelization or structured concurrency, but not both. To make your example work for parallelization you need to hide the fetch promises from await, which breaks structured concurrency: Here's what you have to do to get your example to work the way you are thinking it would: let pages = await urls
.toAsync()
// as soon as you break the promise chain here, structured concurrency is gone
.map(async x => [fetch(x, opts)])
.filter(async wrapper => predicate(await wrapper[0]));
.bufferAhead(4)
.map(wrapper => wrapper[0])
.toArray(); |
What I wrote will provide parallelism with an appropriate implementation of The key here is that async iterators are actually more powerful than async generators. As you point out, an async generator is blocking; calling To be concrete, you can imagine an implementation of AsyncIteratorProto.map = function(mapper) {
return {
next: () => {
return Promise.resolve(this.next())
.then(v =>
v.done
? { done: true }
: Promise.resolve(mapper(v.value)).then(value => ({ done: false, value }))
);
},
};
}; You can run that code today and see concurrency. The following snippet finishes in ~500ms. let AsyncIteratorProto = (async function*(){})().__proto__.__proto__.__proto__;
AsyncIteratorProto.map = function(mapper) {
return {
next: () => {
return Promise.resolve(this.next())
.then(v =>
v.done
? { done: true }
: Promise.resolve(mapper(v.value)).then(value => ({ done: false, value }))
);
},
};
};
let producer = (async function*() {
yield* [500, 500];
})();
let delay = ms => new Promise(res => setTimeout(res, ms));
let sleepy = producer.map(x => delay(x));
(async () => {
let start = Date.now();
await Promise.all([
sleepy.next(),
sleepy.next(),
]);
console.log(`Took ${Date.now() - start} ms`);
})(); |
It is not. Again the problem isn't the iterator protocol at all, it's structured concurrency. If you can create values without having any caller providing an exception handling context you are pushing values not pulling them. |
... Yes it is? I have written code which does that in the comment above. You can run it.
Whether or not you want to call this "structured concurrency", it is definitely possible, and it is definitely concurrency. |
See in a trivial and meaningless sense you are "pulling" values because you are the caller of The counterexample you provided is not really a counterexample because you showed what's producing those promises, not what's consuming them. |
The example that I showed has await Promise.all([
sleepy.next(),
sleepy.next(),
]); That is consuming those promises. With an |
Anyway, it is arguably correct to say this isn't structured concurrency, because if you stop iteration early (for whatever reason) you may have async tasks whose results will never be consumed; control may proceed before execution of those tasks have finished. And I do agree it would be better if it were possible to design this such that you were forced to consume all of the results. I just don't think it's necessary - this design does allow concurrency, and fits well with the existing language. My initial snippet let pages = await urls
.toAsync()
.map(x => fetch(x, opts))
.filter(asyncPredicate);
.bufferAhead(4)
.toArray(); will do a bounded number of fetches/predicates concurrently, will throw if any step throws, and control will not proceed until either it throws an error or all the work is complete, in which case |
What does the implementation of |
When It's somewhat more complicated than that because it needs to keep track of how many promises it has vended are still not settled, so that it only actually calls |
Eh nvm, I guess it would look pretty much like the one I wrote. Anyway I'm going to close this for now. I have a lot to think about in terms of how I would adapt my (currently generator-based) functional APIs for parity with native helpers implemented in this way. It seems that the generators themselves are the hinderance to backpressure, where I was thinking it was unavoidable. |
Yup, a lot like that one. The most important difference is that it probably won't start work until the first call to |
I've distilled down the simplest example I can find of a code pattern that's wrong. I'm not sure if it's of any concern. async function *map(iterable, mapper) {
for await (const item of iterable) {
yield mapper(item);
}
};
function delay(ms) {
return new Promise(res => setTimeout(res, ms))
}
function range() {
return {[Symbol.asyncIterator]() {
let i = 0;
return {
async next() {
// here we remember to protect order
let _i = i++;
await delay(Math.random() * 500)
return _i;
}
}
}};
};
const iter = range()[Symbol.asyncIterator]();
const results = [];
// here we fail to protect order
const promises = new Array(10).fill(0).map(() => iter.next().then(v => results.push(v)));
await Promise.all(promises);
console.log(results); |
Basically the responsibility for ordering integrity is always on the consumer of an async iterator API, which I don't think had fully sunk in in my brain yet. |
I had actually thought that having a request queue inside |
I guess I can now say that my objection is wholly withdrawn |
First let me say that perhaps I will be proved wrong. I come to this proposal as the maintainer of
iter-tools
, and the library I inherited supported sync iterators, async iterators, and some level of parallelization. I've experimented with almost every way I could think of to implement async iterator parallelization, and failed every single time. It is my contention that aside from creating consumer pressure by buffering results mid-pipeline, no other perf tactic is safely possible within the pull-based paradigm.The basic problem is the fundamental design of iterators in which control flow goes consumer -> generator -> consumer -> generator -> consumer. Most importantly the consumer is responsible for handling errors the generator throws, because using async iterators does not give up the properties of structured concurrency, which guarantee you that runtime semantics are well-defined (because you can't ever accidentally proceed past an error). This is both why iterators are an incredibly useful access pattern, and their fundamental limitation that explains why parallelization of async iterators is, essentially, Pyrite.
The text was updated successfully, but these errors were encountered: