Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a buffering helper #4

Open
bakkot opened this issue Feb 6, 2023 · 26 comments
Open

Add a buffering helper #4

bakkot opened this issue Feb 6, 2023 · 26 comments

Comments

@bakkot
Copy link
Collaborator

bakkot commented Feb 6, 2023

For the concurrency in the other helpers to be useful, you have to call .next() multiple times and buffer the results as they come in.

We should have a helper which does that for you, giving you an async generator which reads from the buffer and keeps the buffer full. I'm tentatively calling it bufferAhead but open to other names.

That would let you do stuff like

let pages = asyncIteratorOfUrls
  .map(u => fetch(u))
  .bufferAhead(2);

for await (let page of pages) {
  console.log(await examine(page));
}

And even without using helpers like map you could still make some use of it with raw async generators and other async iterables when the processing of the data you get from the iterable is async, as in

// run the `examine` step concurrently with fetching the next item from the buffer
for await (let item of asyncGenerator().bufferAhead(1)) {
  console.log(await examine(item));
}

I note that such a helper exists in the userland iter-tools library, spelled asyncBuffer.

Open questions:

  • Should it start buffering as soon as you make the helper, or only once you do the first read from it?
  • When someone calls .next() and gets an item which hasn't yet settled, should that trigger another pull from the underlying iterator, or should the parameter to bufferAhead serve as an upper bound on the degree of concurrency to request from the underlying iterator (as well as the lower bound it obviously is)? (I lean towards "it should let you pull more things if you explicitly ask for them"; we should have other mechanisms for limiting concurrency.)
@bergus
Copy link

bergus commented Feb 7, 2023

Should it start buffering as soon as you make the helper, or only once you do the first read from it?

I guess you could make a point for both. On the one hand iterators are still pull-based, on the other hand buffering ahead means starting more operations than were requested.

Should the parameter to bufferAhead serve as an upper bound on the degree of concurrency to request from the underlying iterator?

Yes, I think having it as both the upper and lower bound should be the default. There should be other methods (or extra parameters) for setting only the upper or only the lower bound, but I think it's most expected to set both.

for await (const el of asyncIter
  .map(takes20ms).buffer(2)
  .map(takes10ms).buffer(1)
  .map(takes30ms).buffer(3)
) {
  console.log(el);
}

That should ideally call the three functions at a rate of 1 per 10ms each. Having to write that as asyncIter.map(takes20ms).bufferAhead(2).map(takes10ms).bufferLimit(1).map(takes30ms).bufferAhead(3) would be weird I think.

Always setting a limit would also help to maintain a nice and appropriate backpressure. If you create an iterator with a .buffer(2) limit, you would hope that this cannot be overwritten by someone else just appending .buffer(5) to it.

@bakkot
Copy link
Collaborator Author

bakkot commented Feb 7, 2023

Always setting a limit would also help to maintain a nice and appropriate backpressure. If you create an iterator with a .buffer(2) limit, you would hope that this cannot be overwritten by someone else just appending .buffer(5) to it.

Well, that depends on whether you're thinking of .buffer(2) as being limit. I'm inclined to think of it as being a mechanism for making results available earlier, not a mechanism to limit concurrency, which feels like a different operation. It seems odd that adding a buffer might, in some cases, prevent you from reading more results than you otherwise might be able to.

I think the way I'd want to write your example, where I wanted to limit the degree of concurrency of the takes10ms function, would be to explicitly limit the degree of concurrency of the takes10ms function.

I've thought for a while that we needed a mechanism for limiting concurrency in the standard library (I've mentioned that in #1 as well). If we did have such a thing, so you could write

asyncIter
  .map(limitConcurrency(2, takes20ms))
  .map(limitConcurrency(1, takes10ms))
  .map(takes30ms)
  .buffer(3)

would you still think that buffer should imply an upper as well as a lower bound?

@bergus
Copy link

bergus commented Feb 7, 2023

Well there's multiple things involved. I think ultimately iterators are pull-based, so what we can limit is only the number of concurrent .next() calls - all requests above this limit are getting buffered (which is not necessarily a good idea, since the buffer is unbounded). For all existing builtin async iterators I know (async generators, platform streams) this limit is 1.

Now there might be a way to run an async iterator faster than it will be consumed, which leads to buffering responses up to a certain (possibly unbounded) limit. Unfortunately, there is no way to tell how fast an async iterator can produce results if running steps concurrently. Is .buffer(10) always better than .buffer(3)? When will it just lead to unnecessary buffering? How about .buffer(Infinity) (which won't halt)?

Tbh after pondering a bit, I'm not even certain that concurrent calls to .next() make that much sense. It might be simpler, easier to understand, and more efficient, to just have bufferedMap(asyncFn, bound), bufferedFilter(asyncPredicate, bound) etc helpers that continue iterating their input while already processing yielded elements asynchronously. This would mean specifying just the usual buffering of requests (1 at a time), which would never happen in well-behaved code, instead of having to specify the complicated machinery for sometimes-concurrent .next() calls.

If we did have limitConcurrency

That wouldn't be the same. Consider

asyncIter
  .map(tapper(console.log))
  .map(takes10ms).buffer(1)
  .map(takes30ms).buffer(3)

vs

asyncIter
  .map(tapper(console.log))
  .map(limitConcurrency(1, takes10ms))
  .map(takes30ms).buffer(3)

The second will immediately log 3 times, then buffer the inputs for takes10ms in the limitConcurrency helper. The first would only log right before each takes10ms call, which are spaced out.

@bakkot
Copy link
Collaborator Author

bakkot commented Feb 7, 2023

Now there might be a way to run an async iterator faster than it will be consumed

I'm not sure what this means. An iterator can't produce values which haven't been requested by the consumer - there is, structurally, no mechanism to do so. The bufferAhead helper doesn't increase the rate at which items are consumed, it just requests a constant number of items and then runs forwards requests from the consumer as they come in.

It might be simpler, easier to understand, and more efficient, to just have bufferedMap(asyncFn, bound), bufferedFilter(asyncPredicate, bound) etc helpers that continue iterating their input while already processing yielded elements asynchronously.

My experience of using helpers with those signatures has not been that they are simpler and easier to understand. And they don't do anything for the case where you are writing your own iterable which is capable of producing values concurrently - it means every iterable needs to take a "how many values to buffer" parameter, instead of having concurrency naturally specified based on how many items the consumer is requesting, which is more consistent with being pull-based.

That wouldn't be the same.

I agree it wouldn't be the same, but it seemed like limiting concurrency of takes10ms was what you were going for. If that isn't the goal, what is?

@bergus
Copy link

bergus commented Feb 7, 2023

I'm not sure what this means

Yeah, sorry, I was trying to be deliberately vague since I wanted to refer to all approaches - passing a concurrency value to the function producing the iterator, using your proposed .bufferAhead() method on an iterator, or something else. And yes, "runs faster" is the wrong term, "runs earlier" might be a better fit. Although I guess the ultimate goal still is to increase overall processing speed, assuming not just the producer but also the consumer can be run concurrently or do less waiting for each other?

having concurrency naturally specified based on how many items the consumer is requesting, which is more consistent with being pull-based

Yeah, I can see the appeal in that. But I guess I'm uneasy about how this composes. Let's say I have I have

function sampleIter() {
   return someAsyncIterator().map(x => ).map(y => );
}

The mapping functions may inadvertently rely on not being invoked concurrently. Now if someone write sampleIter().map(foo).forEach(run), it works. But if they change it to sampleIter().map(foo).bufferAhead(3).forEach(run), the function breaks. And the goal of adding .bufferAhead(3) would have been to run only foo concurrently, not changing how sampleIter works, whose implementation the caller is ignorant to.

If that isn't the goal, what is?

My goal would be to stream data from/through async iterators, where all the asynchronous processing of the streaming stages runs concurrently - i.e. have each stage be sequential by itself, but have all stages be active at the same time. Taking

asyncIter.map(takes20ms).map(takes10ms).map(takes30ms)

as an example again, this by default pulls one item from the iterator, takes 60ms to process it, then takes the next, etc. No two pieces of code run concurrently, which is quite a waste if the three callbacks do use different resources and do not contend for them.

asyncIter.map(takes20ms, {buffer: 1}).map(takes10ms, {buffer: 1}).map(takes30ms, {buffer: 1})

should then take an item from the iterator, take 20 ms to process it at the first stage, then already pull the next and process it as well, while the result of the first is being processed at the second stage etc. This would lead to the last stage pulling (and getting) new items every 30 ms. I guess the same can be achieved using your approach

asyncIter.map(takes20ms).map(takes10ms).map(takes30ms).bufferAhead(1)

(although that also affects asyncIter). However, to fine-tune the stream so that it can process one item every 10ms (assuming asyncIter produces them fast enough) I would rather like to write

asyncIter.map(takes20ms, {buffer: 2}).map(takes10ms, {buffer: 1}).map(takes30ms, {buffer: 3})

that details a concurrency factor for each stage separately.

@bakkot
Copy link
Collaborator Author

bakkot commented Feb 7, 2023

The mapping functions may inadvertently rely on not being invoked concurrently.

I agree that in this situation you shouldn't invoke the mapping functions concurrently, but I don't think we should assume this is the expected case. We should instead give users the ability to limit concurrency when they have functions which shouldn't be invoked concurrently. I am maybe convinced of the need to have an explicit method to limit concurrency on an async iterator, though, for the specific case that you want to perform subsequent concurrent operations on top of an async iterator vended by someone else which breaks if you try to consume it concurrently.

However, to fine-tune the stream so that it can process one item every 10ms (assuming asyncIter produces them fast enough) I would rather like to write

asyncIter.map(takes20ms, {buffer: 2}).map(takes10ms, {buffer: 1}).map(takes30ms, {buffer: 3})

How is that better than

asyncIter.map(takes20ms).map(takes10ms).map(takes30ms).bufferAhead(6)

? The latter will start processing items a little bit earlier, but that's arguably desirable, particularly if the times have some noise to them.

The mapBuffer design forces you to care about tuning each stage because there's no way for the last stage to signal to earlier stages that they should be prepping multiple items. But that's a deficit in that design - you shouldn't have to figure out the optimal degree of concurrency for each stage separately; you should just be able to worry about concurrency for the pipeline as a whole.

@laverdet
Copy link

laverdet commented Mar 3, 2023

Hello. I'd like to point out that bufferAhead as implemented in iter-tools could be considered an unsafe operation. The problem that I have with it is that it consumes values from the iterable before it has a consumer to delegate to.

Consider:

import { asyncBuffer, asyncMap } from "iter-tools";

async function* make() {
	for (let ii = 0;;) {
		const value = ii++;
		console.log(`generated ${value}`)
		yield value;
	}
}

function consume(value: number) {
	console.log(`consumed ${value}`);
	return value;
}

function throwIfGreater(operand: number) {
	return (value: number) => {
		if (value > operand) {
			throw new Error(`${value} is greater than ${operand}`);
		}
		return value;
	}
}

const iter = asyncBuffer(4, make());
const work = asyncMap(consume, iter);
const release = asyncMap(throwIfGreater(2), work);
for await (const result of release) {
	console.log(result);
}

The output follows:

generated 0
generated 1
generated 2
generated 3
consumed 0
generated 4
0
generated 5
consumed 1
1
generated 6
consumed 2
2
generated 7
consumed 3
file:///Users/marcel/code/sandbox/dist/wow.js:17
            throw new Error(`${value} is greater than ${operand}`);

In this case the results from make have been abandoned. In this example it's benign but if this were a worker loop, for example accepting HTTP requests, this would be bad. In the case where we buffer ahead and pull a series of rejected promises further questions are raised. Do we consider the buffered promises "handled" or should we go to HostPromiseRejectionTracker and eventually raise unhandledrejection?

Expressed another way, this operation accepts ownership of a IteratorResult before it can safely transfer ownership forward.

An alternative parallelization primitive that I have been using internally is divide. This function accepts an AsyncIterable and splits it into N AsyncIterables which proxy back to the source iterable:

function divide<Type>(
	iterable: AsyncIterable<Type>, count: number
): AsyncIterable<Type>[] {
	let current: Promise<IteratorResult<Type>> | undefined;
	let remaining = count;
	const iterator = iterable[Symbol.asyncIterator]();
	const iterables = [ ...Array(count) ].map(async function*() {
		try {
			while (true) {
				current = async function() {
					const previous = await current;
					if (previous?.done) {
						return previous;
					}
					return iterator.next();
				}();
				const result = await current;
				if (result.done) {
					break;
				} else {
					yield result.value;
				}
			}
		} finally {
			if (--remaining === 0) {
				await iterator.return?.();
			}
		}
	});
	return [ ...iterables ];
}

We can now rewrite the parallelized operation as:

const iter = divide(make(), 4);
await Promise.all(iter.map(async function(iter) {
	const work = asyncMap(consume, iter);
	const release = asyncMap(throwIfGreater(2), work);
	for await (const result of release) {
		console.log(result);
	}
}));

And the result:

generated 0
generated 1
consumed 0
generated 2
consumed 1
generated 3
0
consumed 2
generated 4
1
consumed 3
generated 5
2
consumed 4
generated 6
consumed 5
consumed 6
file:///Users/marcel/code/sandbox/dist/wow2.js:48
            throw new Error(`${value} is greater than ${operand}`);

Note that all generated values were consumed; no result has been abandoned. Promise.all in this case accepts ownership of each "task". Each task throws an error but of course only the first one is rejected.

@bakkot
Copy link
Collaborator Author

bakkot commented Mar 3, 2023

@laverdet I'm not sure what you mean by "unsafe" or "ownership" here. I think it's expected when buffering that results are produced in advance of being consumed, such that if you stop consuming before the end of the stream you may have produced more results than are ever consumed. That's what buffering means. If you don't want that behavior you can't buffer, but that doesn't make the utility "unsafe"; it just makes it unsuitable for some applications.

In any case, the existing behavior of Promise.all is to discard any results past the first exception. That's the same as would happen with an explicit loop or forEach over a buffered iterator. (Indeed, that's what happens in your example as well.) I don't see a cause for concern here.

@bakkot
Copy link
Collaborator Author

bakkot commented Mar 4, 2023

That said, I do think the divide helper you give is interesting as a way to split up work. Why does it wait for the previous call to iterator.next() to settle before calling iterator.next() again, though? That means it can't pull from the underlying async iterator concurrently. Most iterators don't support that, of course, but those that don't generally behave well when you try (i.e., they just wait for the previous call to finish before starting on the next one, like async generators do), and some async iterators do support pulling concurrently (e.g. that's usually what will happen if you implement an async iterator manually, and is what we intend to do with helpers like .map).

@laverdet
Copy link

laverdet commented Mar 4, 2023

Why does it wait for the previous call to iterator.next() to settle before calling iterator.next() again, though?

Well, I think calling .next() concurrently is a mistake and I would hate to see it enshrined in the language specification. It's possible I'm too late to the discussion to make that case though.

If an iterable truly supports concurrent execution I would prefer to just have that iterable yield something like { cursor: Promise<T> } or if Symbol.thenable was on track then it could yield Promise<T> directly [though 27.1.1.4 recommends against this].

The problem I have with concurrency implemented in this way is that it actively encourages abandoned values. Abandoned values can result in memory leaks or lost work.

In my case when I authored divide it was for a pool of worker processes which read jobs out of redis with an SPOP command. By reading the job the worker has committed to publishing a result. If a value was yielded from this iterable but not consumed then the entire pool would have to wait for a costly timeout before resyncing.

I can think of other examples:

This is why I consider bufferAhead to be dangerous.

@bakkot
Copy link
Collaborator Author

bakkot commented Mar 4, 2023

I agree that using these APIs in situations where you need to dispose of values explicitly would be bad. But I am reluctant to optimize for that (in my experience) quite rare case, given how much worse the API gets when you add the restriction that you can't pull concurrently. And again, I don't think it's any worse than Promise.all, which has identical behavior of abandoning values when you hit an exception - do you think there's some meaningful difference I'm missing?

That said, there's possible tweaks here which could help - for example, bufferAhead could let you opt in to disposing any buffered values when exiting early. Or the buffered promises could be passed to a hypothetical cleanup callback so you could deal with them however you like. You'd still need to ensure that you don't hit errors in your map/filter/etc callbacks, though, so this would only be of limited help.

I think calling .next() concurrently is a mistake and I would hate to see it enshrined in the language specification.

Support for calling .next() concurrently is already enshrined in the language specification. Sync generators throw if you call .next() during a call .next(), whereas async generators go to some lengths to explicitly buffer the calls.

If an iterable truly supports concurrent execution I would prefer to just have that iterable yield something like { cursor: Promise<T> } or if Symbol.thenable was on track then it could yield Promise<T> directly [though 27.1.1.4 recommends against this].

The box (cursor) pattern is awkward because it doesn't compose - how do you .filter over such an iterator, for example? The wiring for that is quite tricky. You really want to stick with async iterables here, I think.

(Symbol.thenable isn't going to happen.)

If a value was yielded from this iterable but not consumed then the entire pool would have to wait for a costly timeout before resyncing.

This is a reason that it is important for your use case to consume all the values, but you can call .next() concurrently in your divide helper, and as long as all of the iterables produced by divide are consumed to exhaustion then you will consume every value. Calling .next concurrently does not imply not consuming all values.

@laverdet
Copy link

laverdet commented Mar 6, 2023

Well a big difference between bufferAhead and Promise.all is that bufferAhead is creating these promises on the developer's behalf whereas Promise.all accepts a vector of promises which are already created. Nothing stops the developer from doing: Promise.allSettled(promises).then(reportDiagnostics); return Promise.all(promises);. On the other hand the exact errors lost by bufferAhead are gone without some fancy buffering before that stage of the pipeline. Though admittedly I'm starting from the conclusion that "concurrent next() seems bad" and working backwards trying to justify this position.

In my experience async iterables are very fussy beasts, and getting them right can be challenging. I see concurrent next() as an added layer of unnecessary complexity. I worry that it's easy to make an async iterable which behaves robustly except in the concurrent case. This wouldn't be apparent to the developer that this one function has some spooky behavior. Despite the fact that async generators are well-defined in the concurrent case, the fact remains that there is no built-in in the language or other web specification that I know of which invokes next concurrently. From that perspective it seems like this proposal retroactively adds an expectation to existing AsyncIterable implementations. Furthermore, I don't think that concurrent next() actually gets us anything that couldn't be accomplished by composing a pipeline in another manner.

I actually use a version of bufferAhead in my projects [I call mine lookAhead], except it doesn't invoke next concurrently. In this ecosystem all parallelization is provided by the same primitive: divide. divide and lookAhead each perform a single utility. In your proposal bufferAhead is a combination of both.

A example of where I'd use lookAhead is when I'm reading a paged document manifest. Each page of the manifest contains a reference to 50 or so documents. In this case I would want to buffer 1 or 2 pages to ensure smooth iteration over the document references, but parallelization is not desired or even possible.

A example of where I'd use divide is when I'm fetching the documents referenced in the manifest. In this case I would like to run N operations simultaneously, but not so many that I exhaust the resources on the machine. I do not want to buffer these results.

Here the lookAhead & divide strategy still achieves tunable parallelization + buffering, but doesn't need to invoke next() concurrently.

I disagree with your statement:

But that's a deficit in that design - you shouldn't have to figure out the optimal degree of concurrency for each stage separately; you should just be able to worry about concurrency for the pipeline as a whole.

In this example [paged document manifest -> document fetch] each stage must be tuned specifically. It is not enough to tune the pipeline parallelization at the end because otherwise you will still run into a bottleneck when iterating to a new manifest page. We really do want to specify buffer and parallelization strategies separately for each stage. I believe this will be the case for many non-trivial pipelines. For example, it is ok to fetch 10 images at a time but we probably only want to transcode 2 images at a time.

Anyway concurrency is messy, opinionated, and workload specific. It doesn't seem like the community has experimented too much with any of these patterns, so this part of the proposal seems overly ambitious.

I also believe that the failure of the async cancellation proposal from a few years ago kneecapped this proposal from the start. What do we do in this example?

declare const documents: URL[];

const result = documents.toAsync()
	.map(url => fetch(url))
	.map(response => response.text())
	.bufferAhead(10)
	.some(text => text.includes("marker"));

We want to determine if any of these documents contain the text "marker". We want to fetch 10 documents at a time, and we want to cancel any pending fetches when the marker is found in any of them. The return of the async iterable by some has no way to forward the cancellation to the mapper functions since AbortSignal is of course a DOM API.

With this case in mind I see this proposal as a minor quality of life improvement for some basic cases and one-off scripts. For any kind of work in long-running scripts or in the browser I would still want to use a userspace implementation to leverage AbortSignal for proper resource utilization.

@bakkot
Copy link
Collaborator Author

bakkot commented Mar 7, 2023

Nothing stops the developer from doing: Promise.allSettled(promises).then(reportDiagnostics); return Promise.all(promises);. On the other hand the exact errors lost by bufferAhead are gone without some fancy buffering before that stage of the pipeline. Though admittedly I'm starting from the conclusion that "concurrent next() seems bad" and working backwards trying to justify this position.

In practice, in my experience, this doesn't happen. So it is hard for me to accept that as a real reason to dislike concurrent next(). If you have a better sense of why you dislike it I'd be happy to hear it, but right now this reason seems, as you say, like you are searching for a reason to dislike it rather than starting from an actual concern.

the fact remains that there is no built-in in the language or other web specification that I know of which invokes next concurrently

That's true, but only because there's practically nothing in the language or any other web specification that deals with async iterables at all. This proposal is when we're building out the very first nontrivial uses and combinators, so of course now is the first time this question is arising.

Furthermore, I don't think that concurrent next() actually gets us anything that couldn't be accomplished by composing a pipeline in another manner.

It gets you the ability to specify an async iterator which can be consumed concurrently, at the consumer's option. I am open to other ways of accomplishing this but have not yet heard one.

divide

divide is neat, but it fundamentally breaks the "it's a single linear sequence" nature of iterators, so it is not an adequate replacement for bufferAhead.

a paged document manifest [...] We really do want to specify buffer and parallelization strategies separately for each stage.

Yes, I agree that flatMap, uniquely, is a combinator where you might want to specify concurrency in more detail. It's fairly easy to do this with bufferAhead: foo.map(x => getPage(x)).bufferAhead(1).flatMap(page => fetch(page)). But I agree the case of flatMap is more awkward than other combinators.

I'm not seeing where you get the "want to specify buffer and parallelization strategies separately for each stage" part from, though. Why is buffering in this case problematic? Are you fetching documents so large they don't fit in memory? I agree this utility is not suitable for that case and am OK with that.

For example, it is ok to fetch 10 images at a time but we probably only want to transcode 2 images at a time.

It's true that sometimes when creating an async pull source you want to limit the degree of concurrency that consumers will be able to cause by pulling. But this is just as true with regular async functions (which are also pull sources) as for async iterators: for example, if you are making a "get a thing from this API" function, you probably want to limit how many times that function can be called simultaneously. I've long thought we needed a utility for this (like this popular userland one).

But limiting the degree of concurrency is an entirely different thing than specifying the degree of concurrency. If the consumer of the iterator is working very slowly, they might not need to bother with concurrent transcodes, for example - they could just .bufferAhead(1) and be satisfied.

The limit on concurrency for each stage is a property of the stage, I agree, but the actual realized amount of concurrency appropriate to each stage depends on the other stages, including the ultimate consumer. So I stand by my statement that you really should be able to worry only about the degree of concurrency appropriate to the entire pipeline, at least in most common cases. I think the example with the takes10ms functions makes that very clear.

It doesn't seem like the community has experimented too much with any of these patterns, so this part of the proposal seems overly ambitious.

It will not be possible to experiment with these patterns if we specify map so that the next method of map can't be called concurrently, so that decision, at least, needs to be made now. And I don't think we'll see much more widely-adopted experimentation in the absence of guidance from the language here - people look to the language to set patterns like this.

I also believe that the failure of the async cancellation proposal from a few years ago kneecapped this proposal from the start. What do we do in this example?

You should abort when you're done, of course?

That gets easier with a disposable AbortController, which is coming along, so I'll use that in this example:

using controller = new AbortController.AutoAbort();
const result = documents.toAsync()
	.map(url => fetch(url, { signal: controller.signal }))
	.map(response => response.text())
	.bufferAhead(10)
	.some(text => text.includes("marker"));
controller.abort();

Alternatively, if we have a "run this callback when .return is called" helper, that would let you do

const controller = new AbortController();
const result = documents.toAsync()
	.map(url => fetch(url, { signal: controller.signal }))
	.map(response => response.text())
        .onClose(() => controller.abort())
	.bufferAhead(10)
	.some(text => text.includes("marker"));

I'm not seeing any difficulty here. This is no more a problem than the common await Promise.all(urlsArray.map(urls => fetch(url))) case.

Yes, of course there's no way for some to abort the fetches automatically, but that's true of all sorts of other kinds of cleanup. If your mapper function requires you to hold open a database handle, for example, promise cancellation wouldn't have helped at all - you want to close the handle only when the entire pipeline is finished, rather than doing something in response to individual tasks getting cancelled.

In any case, it's certainly not a problem if you want to use some other implementation for your particular needs. We're never going to design an API which is both fairly simple and suitable for every use case. "Fairly simple" is a hard constraint, so as long as the design here is suitable for most common cases - which I believe it is - then I'm satisfied.


I am open to alternative designs but everything I've seen thus far seems worse. divide breaks the fundamental "linear sequence" nature of iterators. A concurrency parameter to each helper requires you to tune concurrency for each stage independently, which is impractical. This design gives you easy-to-use concurrency while preserving the linear nature of iterators, at the cost of working like Promise.all/any in discarding values in some cases - a cost which seems, to me, to be fairly small.

@laverdet
Copy link

laverdet commented Mar 7, 2023

It gets you the ability to specify an async iterator which can be consumed concurrently, at the consumer's option. I am open to other ways of accomplishing this but have not yet heard one.

My belief is that the consumer alone doesn't have the information needed to specify concurrency for a given pipeline. Each stage will have unique considerations to take into account. So instead of dropping a .bufferAhead(4) at the end of the pipeline I would rather have makeProducer(4) and let this factory apply the desired concurrency when possible.

divide is neat, but it fundamentally breaks the "it's a single linear sequence" nature of iterators, so it is not an adequate replacement for bufferAhead.

Well, it's possible to produce the results back into another async iterable when that kind of thing is needed. The technique to accomplish this isn't glamorous though.

I'm not seeing where you get the "want to specify buffer and parallelization strategies separately for each stage" part from, though. Why is buffering in this case problematic? Are you fetching documents so large they don't fit in memory? I agree this utility is not suitable for that case and am OK with that.

Yes, basically! I've done some amazing things with async generators and having a high degree of control over resource ownership has been crucial in that work.

It's true that sometimes when creating an async pull source you want to limit the degree of concurrency that consumers will be able to cause by pulling. But this is just as true with regular async functions (which are also pull sources) as for async iterators: for example, if you are making a "get a thing from this API" function, you probably want to limit how many times that function can be called simultaneously. I've long thought we needed a utility for this (like this popular userland one).

throat and p-limit are scary since they plaster over the throughput of the mapper. These modules will happily buffer an infinite number of requests when usually the correct approach is to throttle the consumer in another manner.

Anyway, the approach to concurrency that this proposal takes implicitly changes the contract of async iterables from "SHOULD handle concurrent invocations of next" to "MUST handle concurrent invocations of next". Maybe that's ok but it will be something that library authors need to take into account.

Thanks for taking the time to defend your work here. I look forward to seeing how it develops!

@bakkot
Copy link
Collaborator Author

bakkot commented Mar 7, 2023

So instead of dropping a .bufferAhead(4) at the end of the pipeline I would rather have makeProducer(4) and let this factory apply the desired concurrency when possible.

The whole point of pull sources is that backpressure is handled by the consumer driving requests. When you specify it at the producer, you have to buffer, since there's nowhere to put those results. Which is fine in some cases, but as you yourself observe, sometimes you don't want a buffer. (Nothing in this proposal would force you to use .bufferAhead; it's just a convenience method for buffering instead of explicitly making a concurrent consumer. If your consumer is already concurrent then you don't have any reason to use it; no buffering is required. But of course that only works if the source supports concurrent pulling, which is the main thrust here.)

These modules will happily buffer an infinite number of requests when usually the correct approach is to throttle the consumer in another manner.

This is a bit off-topic, but I disagree - when there's multiple consumers, as is often the case (particularly for things like "hit this API"), it's not practical to throttle them independently. The appropriate place to put the limiter is the place which necessitates it, not to force communication among all consumers of that thing.

Anyway, the approach to concurrency that this proposal takes implicitly changes the contract of async iterables from "SHOULD handle concurrent invocations of next" to "MUST handle concurrent invocations of next"

Eh, I don't think it amounts to a change in contract. Compare again the example of async functions - some of them can be called concurrently, others can't. It's possible to use them in such a way that they'll get called concurrently, but that's true for async iterators today as well.

It is fine for an async function to have as part of its contract "cannot be called concurrently", just as it is fine for an async iterator to have as part of its contract "cannot have its next called concurrently". That is true now and will continue to be true with the addition of this helper. It's just that now it will be more useful to support that, where previously there was not much reason to (though it is absolutely something people are doing today).

Thanks for taking the time to defend your work here. I look forward to seeing how it develops!

Yes, thanks for voicing your concerns as well. I'm putting together a presentation for committee about the various tradeoffs here and I'll mention the point you raise about values getting lost.

@rektide
Copy link

rektide commented Jun 7, 2023

This would be a great & simple capability for adding in concurrency! I've seen a number of folks struggle with this at my day jobs, and having something in the language would be a huge help to make async iterators really useful out of the box.

Open questions:

  1. Should it start buffering as soon as you make the helper, or only once you do the first read from it?

I really don't want to get into a position where bufferAhead can only start right away. Starting right away combines two behaviors, buffering and early-start, which feels very restrictive, and would be somewhat surprising to users I think. Better to have simpler pieces with less surprise: start buffering on the first .next() is my vote.

I do kind of want an eager mode. One could also imagine a earlyStart that just pre-reads the first element, right away, specifically for use with bufferAhead, but it seems kind of silly.

  1. When someone calls .next() and gets an item which hasn't yet settled, should that trigger another pull from the underlying iterator, or should the parameter to bufferAhead serve as an upper bound on the degree of concurrency...

Again I think we've happened on another interesting capability (one we probably want even more than earlyStart). But which I don't think belongs here. I'd advocate having bufferAhead to help create concurrency, always buffering ahead, & unaware entirely of the state of promises it's passing through. (This is how the iter-tools implementation works.)

But users also want to limit concurrency too. A .limitOutstanding() or some such would be useful, not just with buffering ahead, but with any pipeline.

@bakkot
Copy link
Collaborator Author

bakkot commented Jun 7, 2023

I do kind of want an eager mode. One could also imagine a earlyStart that just pre-reads the first element, right away, specifically for use with bufferAhead, but it seems kind of silly.

Yeah, my current thinking is that if we want this functionality it should be opt in via a parameter to bufferAhead, e.g. an optional second parameter "startEagerly" which defaults to false. That's not much extra surface area and it does seem like something which is sometimes useful.

@benlesh
Copy link

benlesh commented Aug 2, 2023

I'm sorry if I missed this, because it's a big thread. There's really three ways I see people buffer async things:

  1. Buffering a specific number of things (common)
  2. Buffering things until some other async event, usually a timer (most common)
  3. Creating multiple buffers and buffering for a specific time or count. (less commmon)... so for example if you have a stream of 1, 2, 3, 4, 5, you might emit buffers like [1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5], [5]

The next question becomes whether you want the emission of unfinished buffers on complete to be configurable. For example if you're buffering 3 items, but the source stream ends before you get to 3, do you emit a buffer with two in it? Or do you just drop them? Or do you error? Those are semantic issues.

Regardless, buffering for a specific amount of time is the most often used in RxJS, which will probably have similar use cases to this.

@michaelficarra
Copy link
Member

michaelficarra commented Aug 9, 2023

@benlesh I don't think "buffering for an amount of time" means anything with regard to async iterators. Async iterators don't do anything until they're pulled from. The "buffer for a number of things" style helper that we've been discussing in this thread would have a fixed number of slots, immediately pull from the source enough times to fill all of the slots, and as they empty (as the result iterator is pulled), continue to pull from the source to keep those slots full. What would it mean to pull from the source async iterator for some amount of time? Would this have no parallelism and just pull from the underlying iterator in series until some timer expires and yield a sequence type like an array of the collected values? If that's the thing you're talking about, it seems only very loosely related to what's being proposed here (though arguably a more appropriate use of the term "buffering"). If so, and you think that's a useful helper, we should probably have a separate issue to track that.

edit: Now that I think about it, that sounds like chunking but with chunk size based on number of yielded values over a given time.

@felixfbecker
Copy link

Has anyone attempted to build a ponyfill/userland implementation/prototype of this yet? It might be useful to start experimenting with using one and get some feedback on the design

@ljharb
Copy link
Member

ljharb commented Aug 15, 2023

@felixfbecker is there spec text for anything yet?

@bakkot
Copy link
Collaborator Author

bakkot commented Aug 15, 2023

@felixfbecker I'm working on one, just haven't had time to polish and ship.

@ljharb No. Spec text is a huge amount of work to do for something for which the design isn't settled yet.

@ljharb
Copy link
Member

ljharb commented Aug 15, 2023

I totally agree, I just mean that an implementation prior to spec text seems premature to me.

@bakkot
Copy link
Collaborator Author

bakkot commented Aug 15, 2023

I think it's pretty normal to make prototype implementations to experiment with the design? That's explicitly part of the stage 2 process, even. Having a prototype to play around with is helpful in my experience.

@spencerwilson
Copy link

spencerwilson commented Mar 23, 2024

Here are some functions I wrote, in case they are helpful starting points. The docstrings reflect my understanding, so if I got something wrong/misleading, would appreciate any corrections! I'm sure there are edge cases not handled, too.

While I was working on this I was also learning more about streams, and ultimately came to the conclusion that I probably would have been better off expressing my workload not as a sequence of applications of iterable transformers, but actually as a pipe chain of streams. IMO it'd be a great contribution to the ecosystem if someone could write up the differences between AsyncIterables and Streams, and when one might want to reach for one vs. the other. That was something I struggled with during this exercise.

// Applies `mapper` to values yielded by `iter`. Note: Unlike an async generator
// implementation of `map`, this implementation allows multiple values to make
// progress through mapper concurrently--so long as multiple calls to `next` are
// made. For that, see `queueMany`, also in this module.
//
// Polyfill for `map` from
// https://github.com/tc39/proposal-async-iterator-helpers.
export function map<T, V>(iter: AsyncIterable<T>, mapper: (val: T, i: number) => V): AsyncIterable<Awaited<V>> {
    let i = -1;
    let iterator = iter[Symbol.asyncIterator]();

    const result: AsyncIterableIterator<Awaited<V>> = {
        async next() {
            const innerResult = await iterator.next();

            if (innerResult.done) {
                return {
                    done: true,
                    value: innerResult.value,
                }
            }

            i += 1;
            const value = await mapper(innerResult.value, i);

            return {
                done: false,
                value,
            }
        },

        [Symbol.asyncIterator]() {
            return this;
        },
    };

    return result;
}

/**
 * Eagerly consumes `iter`. That is, as long as `iter` has not reported that
 * it's done there will always be `concurrency` many pending requests for its
 * next values.
 *
 * Wrapping an iterable with `consume` is comparable to piping a ReadableStream
 * into an [identity transform
 * stream](https://streams.spec.whatwg.org/#identity-transform-stream) whose
 * writable side has a `highWaterMark` equal to `Infinity`. That is, `consume`
 * wraps `iter` with another iterable who maintains an internal queue, which it
 * eagerly populates with values pulled from `iter`.
 *
 * In runtimes with good support for streams, this iterator adapter is probably
 * unnecessary and streams should be preferred across the board, due to their
 * eagerness.
 */
export function queueMany<T>(iter: AsyncIterable<T>, concurrency: number): AsyncIterable<T> {
    const bufferedInnerResults: ReturnType<AsyncIterator<T>['next']>[] = [];
    let innerDone = false;
    const iterator = iter[Symbol.asyncIterator]();

    function requestAnother() {
        // Any time inner provides another value, we request another if inner is
        // not yet done. This maintains the concurrency level.
        bufferedInnerResults.push(iterator.next().then((result) => {
            if (result.done || innerDone) {
                innerDone = true;
            } else {
                requestAnother();
            }

            return result;
        }));
    }

    while (bufferedInnerResults.length < concurrency) {
        requestAnother();
    }

    const result: AsyncIterableIterator<T> = {
        next() {
            if (bufferedInnerResults.length === 0) {
                requestAnother();
            }

            // SAFETY: The branch above ensures `bufferedInnerResults` is
            // non-empty.
            return bufferedInnerResults.shift()!;
        },

        [Symbol.asyncIterator]() {
            return this;
        },
    };

    return result;
}

In my specific case I wanted to ensure that my long-running async mappers weren't acting as a substantial bottleneck during iteration. In my application code I iterate through queueMany(map(iter, doLongAsyncThing), 10). Keeping 10 outstanding data pulls from my map implementation was enough of a mitigation to not have this mapping stage kill the overall throughput.

All that said, though, I was still left thinking this was a clumsy approach, and that I probably wanted to use Streams instead. I'm not 100% sure, though.

@spencerwilson
Copy link

Whoops, I think I'd misunderstood above how the specified TransformStream works. Namely, it will never run the transform function (the mapper function) multiple times concurrently:

The stream implementation guarantees that this function will be called only after previous transforms have succeeded

https://streams.spec.whatwg.org/#ts-intro:~:text=The%20stream%20implementation%20guarantees%20that%20this%20function%20will%20be%20called%20only%20after%20previous%20transforms%20have%20succeeded

So my queueMany(map(...)) construction is actually different from a readable.pipeThrough(new TransformStream(...)) wrt concurrent runs of the mapper.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants