Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncIterator.race/merge #15

Open
bakkot opened this issue Feb 10, 2024 · 8 comments
Open

AsyncIterator.race/merge #15

bakkot opened this issue Feb 10, 2024 · 8 comments

Comments

@bakkot
Copy link
Collaborator

bakkot commented Feb 10, 2024

Wanted to write this down so I don't forget, though it won't be in the first version of this proposal.

We should have a helper for merging or racing multiple AsyncIterators. When next is first called you pull from all of them, and then resolve with the first promise to settle. When next is called again, if any of the previously-pulled promises have already settled you immediately settle with that; otherwise you pull from all the underlying iterators which you aren't currently waiting on and resolve with the first to settle.

RxJS has approximately this (apparently reasonably popular), as do some Rust libraries.

This essay points out that this is the type-theory "sum" to zip's "product", for async iterators. That is, it's the natural extension of Promise.race, where zip is the natural extension of Promise.all.

The async-std library in Rust takes the interesting approach of randomizing the order it polls the underlying streams. I'm guessing that's mostly for the case where the first stream makes all of its results available immediately, which would prevent ever getting values from the second stream even if its results were also available immediately. I don't think that's relevant here since we can (and must) hold on to results from previous calls which haven't yet been merged in, and so can ensure that in in the case of two async iterators which vended immediately-settling promises we'd alternate between them.

@conartist6
Copy link

Yep, if you're calling out existing implementations I'lll toss in mine: https://github.com/iter-tools/iter-tools/blob/d7.5/API.md#asyncinterleaveready

@laverdet
Copy link

@conartist6 this implementation does not forward terminations correctly. See:

import { asyncInterleaveReady } from "iter-tools";

async function* range(count) {
	console.log("start");
	try {
		for (let ii = 0; ii < count; ++ii) {
			yield ii;
		}
	} finally {
		console.log("done");
	}
}

try {
	for await (const ii of range(2)) {
		console.log("here", ii);
		throw 1;
	}
} catch {}

console.log("---");

try {
	for await (const ii of asyncInterleaveReady(range(2))) {
		console.log("here", ii);
		throw 1;
	}
} catch {}

Logs:

start
here 0
done
---
start
here 0

@bergus
Copy link

bergus commented Feb 10, 2024

As for implementations, this one should fulfill all the requirements:

async function* merge(iterable) {
    const asyncIterators = Array.from(iterable, o => o[Symbol.asyncIterator]());
    const results = [];
    let count = asyncIterators.length;
    const never = new Promise(() => {});
    function getNext(asyncIterator, index) {
        return asyncIterator.next().then(result => ({
            index,
            result,
        }));
    }
    const nextPromises = asyncIterators.map(getNext);
    try {
        while (count) {
            const {index, result} = await Promise.race(nextPromises);
            if (result.done) {
                nextPromises[index] = never;
                results[index] = result.value;
                count--;
            } else {
                nextPromises[index] = getNext(asyncIterators[index], index);
                yield result.value;
            }
        }
    } finally {
        for (const [index, iterator] of asyncIterators.entries())
            if (nextPromises[index] != never && iterator.return != null)
                iterator.return();
        // no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
    }
    return results;
}

(although it does suffer from the potential memory leaks of Promise.race)

@conartist6
Copy link

Ooh, thanks for the bug report, I'll fix that.

@laverdet
Copy link

laverdet commented Feb 10, 2024

This is the one we use. It avoids the race leak and terminates on closure. Not sure if a fast iterable will starve the others, that wasn't a design goal.

The specification should probably take a position on whether or not an iterator which throws should "cut the line" and resolve before other iterators which incremented normally.

export function collect<Type>(iterables: readonly AsyncIterable<Type>[]): AsyncIterable<Type> {
	switch (iterables.length) {
		case 0: return async function*() {}();
		case 1: return iterables[0]!;
	}
	return async function*() {
		type Accept = () => Type;
		let count = iterables.length;
		let capability: PromiseCapability<Accept | null> | undefined;
		const iterators: AsyncIterator<Type>[] = [];
		const queue: Accept[] = [];
		const accept = async (iterator: AsyncIterator<Type>) => {
			try {
				const next = await iterator.next();
				if (next.done) {
					if (--count === 0 && capability !== undefined) {
						capability.resolve(null);
					}
				} else {
					push(() => {
						void accept(iterator);
						return next.value;
					});
				}
			} catch (error) {
				push(() => { throw error; });
			}
		};
		const push = (accept: Accept) => {
			if (capability === undefined) {
				queue.push(accept);
			} else {
				capability.resolve(accept);
			}
		};

		try {
			// Begin all iterators
			for (const iterable of iterables) {
				const iterator = iterable[Symbol.asyncIterator]();
				iterators.push(iterator);
				void accept(iterator);
			}

			// Delegate to iterables as results complete
			while (true) {
				while (true) {
					const next = queue.shift();
					if (next === undefined) {
						break;
					} else {
						yield next();
					}
				}
				if (count === 0) {
					break;
				} else {
					capability = Promise.withResolvers();
					const next = await capability.promise;
					if (next === null) {
						break;
					} else {
						capability = undefined;
						yield next();
					}
				}
			}

		} catch (err) {
			// Unwind remaining iterators on failure
			try {
				await Promise.all(iterators.map(iterator => iterator.return?.()));
			} catch {}
			throw err;
		}
	}();
}

@laverdet
Copy link

Oh yeah, and I believe that all implementations shared here don't fire off a next() call immediately after the previous result resolves. What we're all doing is buffering 1 result from each iterable in a preamble, but we don't continue to buffer a next result as the results come in. So you may end up with a waterfall after the first result, in the case that the consumer is asynchronous.

@bakkot
Copy link
Collaborator Author

bakkot commented Feb 10, 2024

Thanks for the links!

I didn't mention it in the OP, but there's another requirement (or at least something to think through), which is that ideally when the consumer calls .next multiple times in succession, without waiting for previous calls to settle, that would cause multiple calls to .next on the underlying iterators. I'm not sure if that should be one-to-one or if we should take into account how many promises we're currently waiting on.

That is, suppose you have let it = AsyncIterator.merge(a, b, c), and then do it.next(); it.next(); it.next(); it.next(); it.next(); it.next(); (i.e. a total of 6 calls to it.next()). That should immediately perform at least 2 calls to the .next() method of each of a, b, and c, and maybe it should perform 6 calls to each of them.

As mentioned in the readme, async iterators which support multiple calls to .next() let you get concurrency driven by the consumer, possibly via a built-in helper. A goal of this proposal is to support that whenever possible. Unfortunately it does mean that these helpers cannot be implemented as async generators.

@bakkot
Copy link
Collaborator Author

bakkot commented May 12, 2024

Some more prior art in this library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants