Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concern regarding propagating an early termination throughout iterable operators chain #503

Open
shtaif opened this issue Sep 10, 2023 · 17 comments

Comments

@shtaif
Copy link

shtaif commented Sep 10, 2023

Hey 😃

First, thank you for putting out this library from which I've learned and got inspired a ton. I think I also share this passion for a "streaming-oriented programming" style kinda.

I wanted to share a thought regarding some aspect which concerns seemingly most of the operators on iter-tools.

In short, it appeared to me that you cannot early terminate, or more relevantly tear down a pipeline made with iterable operators in case there's an item actively being pulled and is yet to be resolved. Or at least - that tear down gets postponed to only after any such in-progress pull had cleared out.

I tried to put together this example to demonstrate implications I'm concerned about:

const baseIterableWhichMaintainsSomeResource = {
  [Symbol.asyncIterator]() {
    let timeoutId;
    let resolve;
    let isClosed = false;

    return {
      async next() {
        await new Promise(_resolve => {
          resolve = _resolve;
          timeoutId = setTimeout(_resolve, 1000);
        });
        return isClosed
          ? { done: true, value: undefined }
          : { done: false, value: Math.random() };
      },

      async return() {
        isClosed = true;
        clearTimeout(timeoutId);
        resolve();
        return { done: true, value: undefined };
      },
    };
  },
};

const iterable = execPipe(
  baseIterableWhichMaintainsSomeResource,
  asyncFilter(num => num < 0.2)
);

const it = iterable[Symbol.asyncIterator]();

(async () => {
  let item;
  while (!(item = await it.next()).done) {
    // ...
  }
})();

// Now, perhaps after a short moment and coming from a different context, we determined we're
// actually no longer interested in any further emission coming from that operation...

// So we do:
await it.return();

// However, we're gonna notice that we be hanging for quite a while on this call, all whilst
// our underlying base iterable keeps on producing values in vein.

// This will keep on until our `asyncFilter` operator used above happens to get some value
// which satisfies its condition, only so it can finally become unblocked and able to
// handle/propagate our termination instruction upstream to our base iterable.

As I tried my best to illustrate, any underlying resources would remain open more then necessary, or potentially even indefinitely, even though we've implied we're no longer interested in any further values anyways. In the broad world of iterable scenarios, such hanging pull could also hypothetically be waiting on an event that might take minutes of time or more until fulfilled, say a particular Redis Pub/Sub event to get published - during which we practically have no control over disposing of its underlying connection. The intuitive expectation arguably is that at the moment of closing an iterator in such scenario, the Redis resource would immediately get unsubscribed from.

I should have probably pointed out by now that all this is inherent language behavior for async generators (using which many or most operators in the library are implemented), but having just recently discovered this, it still gives me doubt about real world usage of operators with these conditions, AS WELL AS using of plain generators as they are without at least some patching to work around this.

Do you recognize this as an issue as well, even though native generators were consciously designed like this?
Would you visualize having operators adapted cross-wide such that each upon call to terminate would propagate this call directly to their preceding iterable in the chain, instead of being blocked until reaching its closest yield statement?

Thank you very much for reading this 😊✌🏻

@shtaif shtaif changed the title Concern regarding propagating an early termination through iterable operator chains Concern regarding propagating an early termination throughout iterable operators chain Sep 10, 2023
@conartist6
Copy link
Member

conartist6 commented Sep 10, 2023

It seems to me that what you have and what you want is push-based streaming. Pull-based streaming is not a one-size-fits-all approach, and the benefits it brings are directly tied to limitations like the one you describe.

@conartist6
Copy link
Member

You may also be interested in something like https://wonka.kitten.sh/ -- an set of abstractions that is useful for both push and pull-based streaming

@conartist6
Copy link
Member

conartist6 commented Sep 10, 2023

Oh, no, there is a remedy for you perhaps. It depends a bit on which functions you want to use. I was pretty confused by this, and still find it confusing, as you can see here: tc39/proposal-async-iterator-helpers#11

@conartist6
Copy link
Member

Yeah I guess I think this is worth doing, but it's a pretty big change. Are you willing to help work on it?

@shtaif
Copy link
Author

shtaif commented Sep 11, 2023

It's interesting to study tc39/proposal-async-iterator-helpers#11 thread you've linked and further references that are noted there. Seems like there's more acknowledgement going on about the tradeoffs found in the async generators' serial behavior than I'd assumed.

I am willing to get involved in working on this and will try my best 😃 🤞🏻
I believe this step is at least one important unblocker for async iterators' adoption across the ecosystem. The scenarios I depicted are a real hinderance to using all those iterable operators to their full joy and potential in production setting.

I tend to see operators in concept as being as "transparent" / "respectful" as possible relating to the behavior of their underlying source - up to the extent that if the source is able to process and emit items in parallel (touching the discussion in the proposal thread here) - they possibly shouldn't modify this property via their own operation, but rather let it carry over (as much as makes sense in respect to each particular operator). And I hope I didn't not already make some logical mistake saying that. Therefore all this may have implications to both properties - how calls to .next are handled, as well as calls to .return, but per our original subject here I suppose at this time it should rather concern only the .return part (or not? 🤔).

Anyways, I would assume you've already brainstormed approaches for this by now. Let me also share my takes on implementing this tomorrow since it's a bit late here by now 😅

@shtaif
Copy link
Author

shtaif commented Sep 14, 2023

I've been exaustively trying to re-verify and consolidate the nature of this issue in my mind and what is the best approach to it. Also in respect to the spirit behind the specification for all that async iteration. I'm still getting to the bottom of the references you linked me.

As said, I tend to feel that operators should strive to operate like no more than transformed "projections" of the source, rather than attempt to cover or normalize some general behaviors of the source async iterable. I'll try to reflect this aim across the examples.

Borrowing asyncMap's code as a representative to all the common straightforward operators (leaving aside the macro-generation aspect for now):

This:

export async function* __asyncMap(source, func) {
  let c = 0;
  for await (const value of source) {
    yield await func(value, c++);
  }
}

Can be turned into either...

this, if we wanna preserve the convenient generator looks and feel of the current codebase:

export function __asyncMap(source, func) {
  // We need this to hold for us a concrete reference to the iterator we'll
  // obtain from the source so that we can have the ability to possibly
  // call `.return()` on it *outside* of the `for await` loop consuming it.
  let sourceIterator;

  return Object.assign(
    (async function* () {
      // obtaining an iterator from the source as late as possible to mimic
      // generators' lazy semantics (that I think are useful),
      // which effectively work this way on the original version
      sourceIterator = source[Symbol.asyncIterator]();
      let c = 0;
      for await (const value of { [Symbol.asyncIterator]: sourceIterator }) {
        yield await func(value, c++);
      }
    })(),
    {
      // ✨ Overriding the generator instance's native `.return` method:
      async return() {
        // Why all the `?.`s - we assume we might have not called `.next()` yet before getting here, so `sourceIterator` might have not been assigned, and also it might possibly not have a `.return()` method being optional per the async iteration protocol
        return sourceIterator?.return?.() ?? { done: true, value: undefined };

        // Apart from doing the above - we don't need to also directly tell our own generator instance itself to close (e.g delegating over to its original native .`return()`) since it should reach its ending naturally as soon as the source's own teardown we've just triggered would complete, whose resposibility in such case is to immediately make any still-ongoing pulls from it resolve to `{ done: true }`, thus breaking the consuming generator's `for await` which consequently lets it run to its completion.

        // To clarify, if the source is not well-formed or doesn't/can't cancel as said any previous long-ongoing pull upon being `.returned()`ed - we couldn't mitigate it at operator-side whatsoever even if we order the generator to close directly, since the generator is now occupied anyways until this pull finally clears out, even if this would last up to an hour... However - the key difference with the original code is that before we inevitably get stuck waiting on last pulls to clear just so the generator can land into a resting point in between yields - here we signal the previous link in the chain about our intention to close right at the moment of intent, letting it a chance to clear such pulls as soon as possible.
      },
    }
  );
}

Notes on the above:

  • Might be possible to make some reusable helper for this pattern
  • The operator now is essentially still handling requests for values serially, but not so for requests to terminate!

or this, in plain async iterable style, which trades the niceness of generators for things like freeing us out of restrictive serial handling of items/terminations that's default in generators:

export function __asyncMap(source, func) {
  return {
    [Symbol.asyncIterator]() {
      let sourceIterator;
      let c = 0;
      
      return {
        async next() {
          // obtaining an iterator from the source as late as possible to mimic
          // generators' lazy semantics (that I think are useful),
          // which effectively work this way on the original version
          sourceIterator ??= source[Symbol.asyncIterator]();
          const next = await sourceIterator.next();
          if (next.done) {
            return { done: true, value: undefined };
          }
          try {
            const mapped = await func(next.value, c++);
            return { done: false, value: mapped };
          } catch (err) {
            await sourceIterator.return?.();
            throw err;
          }
        },

        async return() {
          // We assume we might have not called `.next()` yet before getting
          // here, so `sourceIterator` could be `undefined`, and it might
          // possibly not have any `.return()` method as being optional per the
          // async iteration protocol
          return sourceIterator?.return?.() ?? { done: true, value: undefined };
        }
      }
    }
  }
}

Notes on the above:

  • More logically verbose.
  • For complex operators - might prove extremely logically verbose compared to generators, maybe too much to be wanting to do it. Maybe mixing with some generator-flow code (patched where necessary) might be necessary.
  • Should be somewhat more performant that an equivalent generator. Not the main aim though.
  • The operator implemented like this is inherently not handling anything serially, so isn't constituting any possible congestion point, which enables any inherent concurrency behind it in the pipeline to come through (might be either useful or problematic - arguably not for the library to intervene IMO 🤞🏻).



Hope this was useful to read. Let me know what you think about all the above information and any remarks you might have for it 🙏🏻

Thanks!! ✌🏻

@conartist6
Copy link
Member

Just going through this now.

I tend to see operators in concept as being as "transparent" / "respectful" as possible relating to the behavior of their underlying source - up to the extent that if the source is able to process and emit items in parallel (touching the discussion in the proposal thread here) - they possibly shouldn't modify this property via their own operation, but rather let it carry over (as much as makes sense in respect to each particular operator).

Yes, I agree.

@conartist6
Copy link
Member

conartist6 commented Sep 15, 2023

Your first example implementation looks like it is exactly the current implementation. The second example implementation is not useful because it fails to preserve the order of items in the list, i.e.

asyncMap(x => delay(Math.random(), x), [1, 2, 3]) // [2, 3, 1] (random order)

@conartist6
Copy link
Member

conartist6 commented Sep 15, 2023

The correct code was shared by bakkot over in the thread (edit: reformatted):

function asyncMap(mapper, source) {
  return {
    [Symbol.asyncIterator]() {
      return {
        next: () => {
          return Promise.resolve(source.next())
            .then(item =>
              item.done
                ? { done: true }
                : Promise.resolve(mapper(item.value)).then(value => ({ done: false, value }))
            );
        },
      };
    }
  }
};

This code is transparent as regards the request queueing behavior of the source, but it also ensures that items are not reordered accidentally.

@shtaif
Copy link
Author

shtaif commented Sep 15, 2023

Ok, let's figure out where our misunderstandings lay... 🤔



Your first example implementation looks like it is exactly the current implementation

Consider this; here's what's happening in high-level when using library's current pure generator implementation:

  1. you take an instantiated operator and call .return() on it
  2. if the generator instance is preoccupied with previous unsettled pulls, it puts your return request at the top of its internal queue
  3. when previous stuff get cleared - it handles the return
  4. the return exits out of the enclosing for await
  5. the for await upon exit implicit-calls the source iterator's .return()
  6. source iterator runs its teardown routine just now
  7. generator completes, resolving a { done: true } 🏁

Here's what's happening in high-level if using the first example implementation having the overriden return:

  1. you take an instantiated operator and call .return() on it
  2. the patched .return method immediately delegates the call over to the source iterator's .return() (skipping the generator's queue altogether)
  3. source iterator runs its teardown routine (possibly ending any pre-settled pulls on the spot)
  4. the once-occupied generator is freed immediately and completes, resolving a { done: true } 🏁


The second example implementation is not useful because it fails to preserve the order of items in the list

I'm not sure I can see what you're seemingly detecting 😅

In the second proposed implementation, following through with the logic - the moment you ask the operator for a value it asks the source for a value, no async stuff in between that could cause any original order-threatening race conditions. Means that if you make a first pull (without awaiting anything), you get a promise for the first original value, and when you pull secondly you get a promise for the second original value. The order in which these promises eventually resolve is up to the mapping function, however the order in which you got these promises themselves IS correlated to the order of original source values emitted underlying it all. I'm still not completely sure if this is what you meant though.

Here's a JSFiddle with few slight adjustments showing that the expression

asyncMap(x => delay(Math.random(), x), [1, 2, 3])

appears to yield values in a constant 1, 2, 3 matching order. This should be compatible with the definition proclaimed by bakkot:

Say that an iterator is race-free if it gives the same results in the same order regardless of whether calls to its .next method happen sequentially or concurrently.

(again, if I'm not misinterpreting any context here! 🤞🏻)

With my eyes at the moment I can't seem to spot relevant logical differences between this second proposed implementation and the one by bakkot you referenced 😵‍💫 are you certain about this?

@conartist6
Copy link
Member

No not certain. I think it all checks out.

@shtaif
Copy link
Author

shtaif commented Sep 25, 2023

No not certain. I think it all checks out.

Ok I guess this means we're on the same page 👍🏻

Now, as we witness the direction that the proposal-async-iterator-helpers is taking in terms of these concurrency considerations, I would suppose that the second proposed way (the generator-free plain iterable one I showed) might be the preferred candidate here since it also doesn't buffer the .next() calls on the way in addition to the .return()s. I think it would make us as aligned as possible with "the future" so to speak.

Having said this, let's follow through with this question I dropped about all that here so it can verify what I just assumed before we start acting whichever way 🤞🏻

@anacierdem
Copy link

Another option is using regular generator functions with a custom runner something similar to this. While keeping the core logic the same, it can allow for pull and termination propagation behind the scene.

@shtaif
Copy link
Author

shtaif commented Jun 3, 2024

Hmm I've been absent from here for long due to unforeseen circumstances.

Another option is using regular generator functions with a custom runner something similar to this. While keeping the core logic the same, it can allow for pull and termination propagation behind the scene.

@anacierdem I'm not sure if this entirely relates to our subject issue?

iter-tools is a library which provides async iterable "operators" (mostly) - functions that get valid async iterables and return modified valid async iterables and the discussion so far was about the best way to design such operators, so they transparently propagate any .next() and .return() calls immediately over to their source so it can decide how to handle them as the source intended without the operators affecting these mechanics (which async-generator based implementations cannot do as concluded because they sequentialize these calls one-by-one and queue them before they're passed on to the source).

The option you have referenced looked to me like an alternative way altogether to describe, consume and facilitate async collections. Not to say anything about that idea but only that this library IS about async iteration in its standard form :)

@anacierdem
Copy link

Definitely I wasn't clear enough. I didn't mean to change how async iterables work from the outside but I thought we could use regular iterators in the implementation so that the internal "looks" of it would resemble how it would look with async generators, without the limitations. Take the asyncMap as an example:

export function __asyncMap(source, func) {
    return toAsyncIter(function* () {
        let c = 0;
        for (const promise of toIter(source)) {
            // This is a special value that the runner will recognize and enqueue
            yield [
                function* (value) {
                    yield func(value, c++);
                },
                promise
            ]
        }
    })
}

There are two helpers, namely toAsyncIter and toIter. In essence, they are generic functions to do the conversion and the propagation of .next() and .return() calls implicitly (even cancellation and error handling can be abstracted behind them if necessary). The implementation looks cleaner without any explicit reference to those constructs. Yielding an array will make the runner enqueue the result of a promise for later processing. I admit that this part that is the least favorable part of the pattern. I initially thought it would look better, somehow...

This is how would a filter look like:

export function __asyncFilter(source, predicate) {
    return toAsyncIter(function* () {
        for (const promise of toIter(source)) {
            yield [
                function* (value) {
                    if (predicate(value)) {
                        yield value;
                    }
                },
                promise
            ]
        }
    })
}

Hope the idea is a littlle bit clearer this time :) I wish there was a better way to represent these...

@conartist6
Copy link
Member

conartist6 commented Jun 3, 2024

@shtaif I think the way the library sees the breakdown between sync and async will likely change to some degree in major version 8 -- in any case it's worth discussing what the ideal is, not just what has been our guiding rationale in the past.

@anacierdem I have been doing something quite similar, but I write it like this:

// I should move these into iter-tools
import { Coroutine, getStreamIterator } from '@bablr/coroutine';
import { StreamIterable } from '@bablr/agast-helpers/stream';

export function __asyncFilter(source, predicate) {
  return new StreamIterable(function* () {
    const co = new Coroutine(getStreamIterator(source));

    for (;;) {
      co.advance();
      // different than iter-tools: use a single impl that can act sync or async
      if (co.value instanceof Promise) yield co.value;
      if (co.done) break;
      
      let shouldEmit = predicate(co.value);
      // yielding a promise does not emit an item from the iterator, it only resolves the promise
      if (shouldEmit instanceof Promise) shouldEmit = yield shouldEmit;
      if (shouldEmit) yield co.value;
    }
  }
)

Note: I should be writing a try/finally too I guess for cleanup, eh

@shtaif
Copy link
Author

shtaif commented Jun 11, 2024

@anacierdem thank you for the clarification, I get the point now :)

So the common aim is to try to retain as much as possible the generator constructs by using a sync generator as the base for an operator's implementation, with a runner/tooling allowing to await intermediate promises, iterate the source async iterable even though we're in a sync generator and cannot use a for await, translate that sync generator into an async one as the final output...

  • In @conartist6's example, I like the ability to just yield a promise over to the "runner" at any point as if to say "give me back this promise resolved". But to add my thought - I would like to imagine an explicit separation between the intents of "resolve this promise back to me" and "actually yield this out". Like the fact that an await statement either resolves a value if it's a promise, or returns it as-is if not, without any other "side-effect" apart from that (and a micro-tick of course). Perhaps yield a special wrapping which the runner is made to pick out, like yield { [RESOLVE_SYMBOL]: myMaybePromise }; with which it would react similarly to the await semantics. And as such, yielding anything other than this will indicate the "output this" intent.
  • Can the idea of for (const promise of toIter(source)) { retain to some extent the cleanup semantics that exist in for await - such that it calls source.return?.()? That would be very nice. Hmm however in practice a broken for await loop actually calls await source.return?.(), which makes sense because cleanup could be async and needs to safely be awaited, and I'm not sure if this could technically work like this behind the scenes, what do you think? Otherwise, there will have to be a wrapping try finally/catch right in every implementation body...
  • Lastly, should the idea be to eventually adapt these implementations to make it possible to write a hybrid operator function (say filter) that can work with either sync or async sources (yielding either plain / promised values in each case correspondingly)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants