Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer through #2205

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

Buffer through #2205

wants to merge 6 commits into from

Conversation

RaasAhsan
Copy link

@RaasAhsan RaasAhsan commented Jan 8, 2021

I'm starting to explore some preliminary ideas for an fs2-oriented Queue. FS2 provides an integration with the Cats Effect Queue, but it is neither stream- nor chunk-aware, so there are some inefficiencies and workarounds in place to get things to play well. Based on conversation in Gitter, some desirable properties of such fs2-oriented queue are: stream termination, chunk awareness, error propagation, and backpressure.

Before I get any hopes up, I haven't actually implement a Queue in this PR. I add a new combinator, bufferThrough, which is implemented in terms of the CE queue and I think satisfies most of the properties laid out above.

bufferThrough is pretty much the identity function, but elements are buffered through a queue. Actually, there is another function that already does exactly this: prefetch. The main difference is that bufferThrough gives you more freedom to control backpressure semantics by supplying whatever kind of queue you want, using a trick similar to the one that @SystemFw used for Async.cont. Accordingly, prefetch can be implemented in terms of bufferThrough.

So I'm not actually introducing anything groundbreaking here! My goal is rather to draw attention to yet another dimension of streaming queues: the topology of the system. prefetch and bufferThrough are both single-producer, single-consumer systems. I'm not sure yet how to generalize over the remaining three corners of the matrix, if that's even possible.

An interesting question is how stream termination would work in multiple-producer systems, if it even makes sense at all. The only behavior that makes sense to me is terminating all producers and consumers whenever a single producer terminates, but that doesn't seem very useful. At that point, I think you would just use enqueueUnterminated and dequeueUnterminated freely.

I should've probably opened a separate issue or discussion for this, but oh well.

@@ -346,6 +346,25 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
go(Nil, false, this).stream
}

// A builder for queues.
trait MakeQueue[F[_]] {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure where to put this, but it lets us create a queue-variant chosen by the user but they can't tamper with its contents at all since create is polymorphic.

@mpilquist
Copy link
Member

Looks good to me. I ran in to the same issue when thinking about a new fs2 queue type.

Re: bufferThrough, I'm a bit worried that it would be confused with the various buffer methods we have now. Strangely, our existing buffer operations really only do prefetching and our existing prefetch operations do buffering... I'm open to changing this for 3.0 release though, as I don't think either of those operations are heavily used.

@RaasAhsan
Copy link
Author

Sounds good, I can try to come up with some other names. I also don't think we really need the MakeQueue trait. It's a neat trick, but passing in an empty Queue is probably sufficient with some documentation

@RaasAhsan RaasAhsan marked this pull request as ready for review January 22, 2021 02:16
@mpilquist mpilquist changed the base branch from develop to main January 30, 2021 21:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants