Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelization is impossible #11

Closed
conartist6 opened this issue Aug 12, 2023 · 19 comments
Closed

Parallelization is impossible #11

conartist6 opened this issue Aug 12, 2023 · 19 comments

Comments

@conartist6
Copy link

conartist6 commented Aug 12, 2023

First let me say that perhaps I will be proved wrong. I come to this proposal as the maintainer of iter-tools, and the library I inherited supported sync iterators, async iterators, and some level of parallelization. I've experimented with almost every way I could think of to implement async iterator parallelization, and failed every single time. It is my contention that aside from creating consumer pressure by buffering results mid-pipeline, no other perf tactic is safely possible within the pull-based paradigm.

The basic problem is the fundamental design of iterators in which control flow goes consumer -> generator -> consumer -> generator -> consumer. Most importantly the consumer is responsible for handling errors the generator throws, because using async iterators does not give up the properties of structured concurrency, which guarantee you that runtime semantics are well-defined (because you can't ever accidentally proceed past an error). This is both why iterators are an incredibly useful access pattern, and their fundamental limitation that explains why parallelization of async iterators is, essentially, Pyrite.

@bakkot
Copy link
Collaborator

bakkot commented Aug 12, 2023

Well, the current plan is to buffer results mid-pipeline. But since it is up to a specified bound, rather than buffering indefinitely, this does not suffer from the backpressure problem - the program still will only request results from the producer as fast as the consumer is capable of handling them.

This is, incidentally, quite similar to one of the approaches Rust takes (a Stream of Futures in Rust is conceptually similar to an async iterator in JS - in particular, in Rust, a Future does not start doing work until you actually await it).

So it seems possible to me. Indeed we have a concrete design with a solid consistency property. Can you say more about what specifically seems impossible? Or is the bufferAhead helper what you're suggesting?

@conartist6
Copy link
Author

@bakkot As you mention, creating backpressure works fine, but since it cannot allow values to be created faster than they can be consumed backpressure is not parallelization. People coming here want parallelization, and they won't get it.

@bakkot
Copy link
Collaborator

bakkot commented Aug 12, 2023

since it cannot allow values to be created faster than they can be consumed backpressure is not parallelization

I don't really know what that means.

Like:

let pages = await urls
  .toAsync()
  .map(x => fetch(x, opts))
  .filter(asyncPredicate);
  .bufferAhead(4)
  .toArray();

This will do up to 4 fetches in parallel, and can start running the predicate while the later fetches are still running. That is what I, at least, want. How is that not parallelization? (Or at least concurrency, if we're being precise.)

@conartist6
Copy link
Author

conartist6 commented Aug 12, 2023

@bakkot That's exactly where I tripped up when I first tried to build that. I thought that writing what you just wrote would provide parallelization, but it you look closely you will find that it does not. Your problem is there's no such thing as an async iterable of promises, so map, being a consumer and producer of async iterables, will always wait both for the URL and for the result of fetching before it continues processing.

@conartist6
Copy link
Author

conartist6 commented Aug 12, 2023

You can have either parallelization or structured concurrency, but not both. To make your example work for parallelization you need to hide the fetch promises from await, which breaks structured concurrency:

Here's what you have to do to get your example to work the way you are thinking it would:

let pages = await urls
  .toAsync()
  // as soon as you break the promise chain here, structured concurrency is gone
  .map(async x => [fetch(x, opts)])
  .filter(async wrapper => predicate(await wrapper[0]));
  .bufferAhead(4)
  .map(wrapper => wrapper[0])
  .toArray();

@bakkot
Copy link
Collaborator

bakkot commented Aug 12, 2023

What I wrote will provide parallelism with an appropriate implementation of map.

The key here is that async iterators are actually more powerful than async generators. As you point out, an async generator is blocking; calling .next multiple times without waiting for earlier calls to finish will simply stick the requests in a queue. But iterators don't need to have that property: it is perfectly possible for an arbitrary iterator to respond to the second call to .next by beginning to do work even if a prior call to .next has not yet settled. Towards that end, the .bufferAhead(N) helper will call .next N times without awaiting the intermediate results. If it's invoked on an async generator, that won't do anything concurrently (though will not break anything), but iterator helpers can be designed to do something smarter in this case.

To be concrete, you can imagine an implementation of map that looks like this, though obviously it will need to be more complicated to maintain the consistency property; this is just for illustration:

AsyncIteratorProto.map = function(mapper) {
  return {
    next: () => {
      return Promise.resolve(this.next())
        .then(v =>
          v.done
            ? { done: true }
            : Promise.resolve(mapper(v.value)).then(value => ({ done: false, value }))
        );
    },
  };
};

You can run that code today and see concurrency. The following snippet finishes in ~500ms.

let AsyncIteratorProto = (async function*(){})().__proto__.__proto__.__proto__;

AsyncIteratorProto.map = function(mapper) {
  return {
    next: () => {
      return Promise.resolve(this.next())
        .then(v =>
          v.done
            ? { done: true }
            : Promise.resolve(mapper(v.value)).then(value => ({ done: false, value }))
        );
    },
  };
};

let producer = (async function*() {
  yield* [500, 500];
})();

let delay = ms => new Promise(res => setTimeout(res, ms));

let sleepy = producer.map(x => delay(x));

(async () => {
  let start = Date.now();
  await Promise.all([
    sleepy.next(),
    sleepy.next(),
  ]);
  console.log(`Took ${Date.now() - start} ms`);
})();

@conartist6
Copy link
Author

It is perfectly possible for an arbitrary iterator to respond to the second call to .next by beginning to do work even if a prior call to .next has not yet settled.

It is not.

Again the problem isn't the iterator protocol at all, it's structured concurrency. If you can create values without having any caller providing an exception handling context you are pushing values not pulling them.

@bakkot
Copy link
Collaborator

bakkot commented Aug 12, 2023

It is not.

... Yes it is? I have written code which does that in the comment above. You can run it.

Promise.all(['a', 'b'].map(async () => {/*...*/})) creates values without having the caller provide an exception handling context in exactly the same sense. (That's all my snippet above is doing, in the end, just in a more roundabout fashion.)

Whether or not you want to call this "structured concurrency", it is definitely possible, and it is definitely concurrency.

@conartist6
Copy link
Author

See in a trivial and meaningless sense you are "pulling" values because you are the caller of next().
But in every sense that matters you aren't part of the call stack because there's no await -- you aren't handling exceptions.

The counterexample you provided is not really a counterexample because you showed what's producing those promises, not what's consuming them.

@bakkot
Copy link
Collaborator

bakkot commented Aug 12, 2023

The example that I showed has

await Promise.all([
  sleepy.next(),
  sleepy.next(),
]);

That is consuming those promises. With an await, even.

@bakkot
Copy link
Collaborator

bakkot commented Aug 12, 2023

Anyway, it is arguably correct to say this isn't structured concurrency, because if you stop iteration early (for whatever reason) you may have async tasks whose results will never be consumed; control may proceed before execution of those tasks have finished.

And I do agree it would be better if it were possible to design this such that you were forced to consume all of the results. I just don't think it's necessary - this design does allow concurrency, and fits well with the existing language. My initial snippet

let pages = await urls
  .toAsync()
  .map(x => fetch(x, opts))
  .filter(asyncPredicate);
  .bufferAhead(4)
  .toArray();

will do a bounded number of fetches/predicates concurrently, will throw if any step throws, and control will not proceed until either it throws an error or all the work is complete, in which case pages will hold all of the results, in order. It is no less safe than Promise.all, and frequently it is the thing you want.

@conartist6
Copy link
Author

What does the implementation of bufferAhead look like?

@bakkot
Copy link
Collaborator

bakkot commented Aug 12, 2023

When b.next() is first called on b = foo.bufferAhead(N), it calls .next on its receiver (here foo) N times; it returns the first resulting promise immediately and puts the remainder in an internal queue. Calling b.next() again vends the promise from the head of the queue, calls foo.next() again, and puts the resulting promise in the tail of the queue.

It's somewhat more complicated than that because it needs to keep track of how many promises it has vended are still not settled, so that it only actually calls foo.next() when the total of [promises vended but not settled] + [promises in queue] drops below N, but that's the basic idea.

@conartist6
Copy link
Author

Eh nvm, I guess it would look pretty much like the one I wrote.

Anyway I'm going to close this for now. I have a lot to think about in terms of how I would adapt my (currently generator-based) functional APIs for parity with native helpers implemented in this way. It seems that the generators themselves are the hinderance to backpressure, where I was thinking it was unavoidable.

@bakkot
Copy link
Collaborator

bakkot commented Aug 12, 2023

Yup, a lot like that one. The most important difference is that it probably won't start work until the first call to .next; I have that listed as an open question here, but I'm currently leaning pretty hard toward not starting work until actually polled for the first time (but maybe having an option to start eagerly).

@conartist6
Copy link
Author

conartist6 commented Sep 27, 2023

I've distilled down the simplest example I can find of a code pattern that's wrong. I'm not sure if it's of any concern.

async function *map(iterable, mapper) {
  for await (const item of iterable) {
    yield mapper(item);
  }
};

function delay(ms) {
  return new Promise(res => setTimeout(res, ms))
}

function range() {
  return {[Symbol.asyncIterator]() {
    let i = 0;
    return {
      async next() {
        // here we remember to protect order
        let _i = i++;
        await delay(Math.random() * 500)
        return _i;
      }
    }
  }};
};


const iter = range()[Symbol.asyncIterator]();
const results = [];

// here we fail to protect order
const promises = new Array(10).fill(0).map(() => iter.next().then(v => results.push(v)));

await Promise.all(promises);
console.log(results);

@conartist6
Copy link
Author

Basically the responsibility for ordering integrity is always on the consumer of an async iterator API, which I don't think had fully sunk in in my brain yet.

@conartist6
Copy link
Author

I had actually thought that having a request queue inside map would prevent this from happening, but it doesn't

@conartist6
Copy link
Author

I guess I can now say that my objection is wholly withdrawn

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants