-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concern regarding propagating an early termination throughout iterable operators chain #503
Comments
|
You may also be interested in something like https://wonka.kitten.sh/ -- an set of abstractions that is useful for both push and pull-based streaming |
Oh, no, there is a remedy for you perhaps. It depends a bit on which functions you want to use. I was pretty confused by this, and still find it confusing, as you can see here: tc39/proposal-async-iterator-helpers#11 |
Yeah I guess I think this is worth doing, but it's a pretty big change. Are you willing to help work on it? |
It's interesting to study tc39/proposal-async-iterator-helpers#11 thread you've linked and further references that are noted there. Seems like there's more acknowledgement going on about the tradeoffs found in the async generators' serial behavior than I'd assumed. I am willing to get involved in working on this and will try my best 😃 🤞🏻 I tend to see operators in concept as being as "transparent" / "respectful" as possible relating to the behavior of their underlying source - up to the extent that if the source is able to process and emit items in parallel (touching the discussion in the proposal thread here) - they possibly shouldn't modify this property via their own operation, but rather let it carry over (as much as makes sense in respect to each particular operator). And I hope I didn't not already make some logical mistake saying that. Therefore all this may have implications to both properties - how calls to Anyways, I would assume you've already brainstormed approaches for this by now. Let me also share my takes on implementing this tomorrow since it's a bit late here by now 😅 |
I've been exaustively trying to re-verify and consolidate the nature of this issue in my mind and what is the best approach to it. Also in respect to the spirit behind the specification for all that async iteration. I'm still getting to the bottom of the references you linked me. As said, I tend to feel that operators should strive to operate like no more than transformed "projections" of the source, rather than attempt to cover or normalize some general behaviors of the source async iterable. I'll try to reflect this aim across the examples. Borrowing This:export async function* __asyncMap(source, func) {
let c = 0;
for await (const value of source) {
yield await func(value, c++);
}
} Can be turned into either...this, if we wanna preserve the convenient generator looks and feel of the current codebase:export function __asyncMap(source, func) {
// We need this to hold for us a concrete reference to the iterator we'll
// obtain from the source so that we can have the ability to possibly
// call `.return()` on it *outside* of the `for await` loop consuming it.
let sourceIterator;
return Object.assign(
(async function* () {
// obtaining an iterator from the source as late as possible to mimic
// generators' lazy semantics (that I think are useful),
// which effectively work this way on the original version
sourceIterator = source[Symbol.asyncIterator]();
let c = 0;
for await (const value of { [Symbol.asyncIterator]: sourceIterator }) {
yield await func(value, c++);
}
})(),
{
// ✨ Overriding the generator instance's native `.return` method:
async return() {
// Why all the `?.`s - we assume we might have not called `.next()` yet before getting here, so `sourceIterator` might have not been assigned, and also it might possibly not have a `.return()` method being optional per the async iteration protocol
return sourceIterator?.return?.() ?? { done: true, value: undefined };
// Apart from doing the above - we don't need to also directly tell our own generator instance itself to close (e.g delegating over to its original native .`return()`) since it should reach its ending naturally as soon as the source's own teardown we've just triggered would complete, whose resposibility in such case is to immediately make any still-ongoing pulls from it resolve to `{ done: true }`, thus breaking the consuming generator's `for await` which consequently lets it run to its completion.
// To clarify, if the source is not well-formed or doesn't/can't cancel as said any previous long-ongoing pull upon being `.returned()`ed - we couldn't mitigate it at operator-side whatsoever even if we order the generator to close directly, since the generator is now occupied anyways until this pull finally clears out, even if this would last up to an hour... However - the key difference with the original code is that before we inevitably get stuck waiting on last pulls to clear just so the generator can land into a resting point in between yields - here we signal the previous link in the chain about our intention to close right at the moment of intent, letting it a chance to clear such pulls as soon as possible.
},
}
);
} Notes on the above:
or this, in plain async iterable style, which trades the niceness of generators for things like freeing us out of restrictive serial handling of items/terminations that's default in generators:export function __asyncMap(source, func) {
return {
[Symbol.asyncIterator]() {
let sourceIterator;
let c = 0;
return {
async next() {
// obtaining an iterator from the source as late as possible to mimic
// generators' lazy semantics (that I think are useful),
// which effectively work this way on the original version
sourceIterator ??= source[Symbol.asyncIterator]();
const next = await sourceIterator.next();
if (next.done) {
return { done: true, value: undefined };
}
try {
const mapped = await func(next.value, c++);
return { done: false, value: mapped };
} catch (err) {
await sourceIterator.return?.();
throw err;
}
},
async return() {
// We assume we might have not called `.next()` yet before getting
// here, so `sourceIterator` could be `undefined`, and it might
// possibly not have any `.return()` method as being optional per the
// async iteration protocol
return sourceIterator?.return?.() ?? { done: true, value: undefined };
}
}
}
}
} Notes on the above:
Hope this was useful to read. Let me know what you think about all the above information and any remarks you might have for it 🙏🏻 Thanks!! ✌🏻 |
Just going through this now.
Yes, I agree. |
Your first example implementation looks like it is exactly the current implementation. The second example implementation is not useful because it fails to preserve the order of items in the list, i.e. asyncMap(x => delay(Math.random(), x), [1, 2, 3]) // [2, 3, 1] (random order) |
The correct code was shared by bakkot over in the thread (edit: reformatted): function asyncMap(mapper, source) {
return {
[Symbol.asyncIterator]() {
return {
next: () => {
return Promise.resolve(source.next())
.then(item =>
item.done
? { done: true }
: Promise.resolve(mapper(item.value)).then(value => ({ done: false, value }))
);
},
};
}
}
}; This code is transparent as regards the request queueing behavior of the source, but it also ensures that items are not reordered accidentally. |
Ok, let's figure out where our misunderstandings lay... 🤔
Consider this; here's what's happening in high-level when using library's current pure generator implementation:
Here's what's happening in high-level if using the first example implementation having the overriden
I'm not sure I can see what you're seemingly detecting 😅 In the second proposed implementation, following through with the logic - the moment you ask the operator for a value it asks the source for a value, no async stuff in between that could cause any original order-threatening race conditions. Means that if you make a first pull (without Here's a JSFiddle with few slight adjustments showing that the expression asyncMap(x => delay(Math.random(), x), [1, 2, 3]) appears to yield values in a constant 1, 2, 3 matching order. This should be compatible with the definition proclaimed by bakkot: (again, if I'm not misinterpreting any context here! 🤞🏻) With my eyes at the moment I can't seem to spot relevant logical differences between this second proposed implementation and the one by bakkot you referenced 😵💫 are you certain about this? |
No not certain. I think it all checks out. |
Ok I guess this means we're on the same page 👍🏻 Now, as we witness the direction that the proposal-async-iterator-helpers is taking in terms of these concurrency considerations, I would suppose that the second proposed way (the generator-free plain iterable one I showed) might be the preferred candidate here since it also doesn't buffer the Having said this, let's follow through with this question I dropped about all that here so it can verify what I just assumed before we start acting whichever way 🤞🏻 |
Another option is using regular generator functions with a custom runner something similar to this. While keeping the core logic the same, it can allow for pull and termination propagation behind the scene. |
Hmm I've been absent from here for long due to unforeseen circumstances.
@anacierdem I'm not sure if this entirely relates to our subject issue?
The option you have referenced looked to me like an alternative way altogether to describe, consume and facilitate async collections. Not to say anything about that idea but only that this library IS about async iteration in its standard form :) |
Definitely I wasn't clear enough. I didn't mean to change how async iterables work from the outside but I thought we could use regular iterators in the implementation so that the internal "looks" of it would resemble how it would look with async generators, without the limitations. Take the asyncMap as an example: export function __asyncMap(source, func) {
return toAsyncIter(function* () {
let c = 0;
for (const promise of toIter(source)) {
// This is a special value that the runner will recognize and enqueue
yield [
function* (value) {
yield func(value, c++);
},
promise
]
}
})
} There are two helpers, namely This is how would a filter look like: export function __asyncFilter(source, predicate) {
return toAsyncIter(function* () {
for (const promise of toIter(source)) {
yield [
function* (value) {
if (predicate(value)) {
yield value;
}
},
promise
]
}
})
} Hope the idea is a littlle bit clearer this time :) I wish there was a better way to represent these... |
@shtaif I think the way the library sees the breakdown between sync and async will likely change to some degree in major version @anacierdem I have been doing something quite similar, but I write it like this: // I should move these into iter-tools
import { Coroutine, getStreamIterator } from '@bablr/coroutine';
import { StreamIterable } from '@bablr/agast-helpers/stream';
export function __asyncFilter(source, predicate) {
return new StreamIterable(function* () {
const co = new Coroutine(getStreamIterator(source));
for (;;) {
co.advance();
// different than iter-tools: use a single impl that can act sync or async
if (co.value instanceof Promise) yield co.value;
if (co.done) break;
let shouldEmit = predicate(co.value);
// yielding a promise does not emit an item from the iterator, it only resolves the promise
if (shouldEmit instanceof Promise) shouldEmit = yield shouldEmit;
if (shouldEmit) yield co.value;
}
}
) Note: I should be writing a |
@anacierdem thank you for the clarification, I get the point now :) So the common aim is to try to retain as much as possible the generator constructs by using a sync generator as the base for an operator's implementation, with a runner/tooling allowing to await intermediate promises, iterate the source async iterable even though we're in a sync generator and cannot use a
|
Hey 😃
First, thank you for putting out this library from which I've learned and got inspired a ton. I think I also share this passion for a "streaming-oriented programming" style kinda.
I wanted to share a thought regarding some aspect which concerns seemingly most of the operators on
iter-tools
.In short, it appeared to me that you cannot early terminate, or more relevantly tear down a pipeline made with iterable operators in case there's an item actively being pulled and is yet to be resolved. Or at least - that tear down gets postponed to only after any such in-progress pull had cleared out.
I tried to put together this example to demonstrate implications I'm concerned about:
As I tried my best to illustrate, any underlying resources would remain open more then necessary, or potentially even indefinitely, even though we've implied we're no longer interested in any further values anyways. In the broad world of iterable scenarios, such hanging pull could also hypothetically be waiting on an event that might take minutes of time or more until fulfilled, say a particular Redis Pub/Sub event to get published - during which we practically have no control over disposing of its underlying connection. The intuitive expectation arguably is that at the moment of closing an iterator in such scenario, the Redis resource would immediately get unsubscribed from.
I should have probably pointed out by now that all this is inherent language behavior for async generators (using which many or most operators in the library are implemented), but having just recently discovered this, it still gives me doubt about real world usage of operators with these conditions, AS WELL AS using of plain generators as they are without at least some patching to work around this.
Do you recognize this as an issue as well, even though native generators were consciously designed like this?
Would you visualize having operators adapted cross-wide such that each upon call to terminate would propagate this call directly to their preceding iterable in the chain, instead of being blocked until reaching its closest
yield
statement?Thank you very much for reading this 😊✌🏻
The text was updated successfully, but these errors were encountered: