Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impl Future for stream::{Buffered, BufferUnordered, FuturesOrdered} #2677

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
97 changes: 77 additions & 20 deletions futures-util/src/stream/futures_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,68 @@ impl<Fut: Future> FuturesOrdered<Fut> {
self.in_progress_queue.push(wrapped);
}
}

/// Poll the contained futures in this queue.
///
/// `Stream::poll_next` must be used in order to retrieve the outputs.
///
/// `Poll::Ready` indicates that the underlying futures are all completed.
pub fn poll_all(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let this = &mut *self;

loop {
match ready!(this.in_progress_queue.poll_next_unpin(cx)) {
Some(output) => {
let index = output.index;
this.queued_outputs.push(output);
if index == this.next_outgoing_index {
// the Stream is now ready to be polled
cx.waker().wake_by_ref();
break Poll::Pending;
}
}
None => break Poll::Ready(()),
}
}
}

/// `Some` if an output is immediately available
fn peek_wrapper(&self) -> Option<()> {
match self.queued_outputs.peek() {
Some(next_output) if next_output.index == self.next_outgoing_index => Some(()),
_ => None,
}
}

/// `None` if `Stream::is_terminated`
fn poll_peek_wrapper(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
let res = self.as_mut().poll_all(cx);
let peek = self.peek_wrapper();

match res {
Poll::Pending => match peek {
None => Poll::Pending,
output => Poll::Ready(output),
},
Poll::Ready(()) => Poll::Ready(peek),
}
}

/// WIP: if a value is immediately available, borrow it
pub fn peek(&self) -> Option<&Fut::Output> {
let peek = self.peek_wrapper().map(drop);
peek.and_then(move |_| self.queued_outputs.peek()).map(|wrapper| &wrapper.data)
}

/// WIP: `None` indicates the end of the stream
pub fn poll_peek<'a>(
mut self: Pin<&'a mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<&'a Fut::Output>> {
let peek = ready!(self.as_mut().poll_peek_wrapper(cx));
let this = self.get_mut();
Poll::Ready(peek.and_then(move |_| this.queued_outputs.peek()).map(|wrapper| &wrapper.data))
}
}

impl<Fut: Future> Default for FuturesOrdered<Fut> {
Expand All @@ -179,29 +241,24 @@ impl<Fut: Future> Default for FuturesOrdered<Fut> {
impl<Fut: Future> Stream for FuturesOrdered<Fut> {
type Item = Fut::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

// Check to see if we've already received the next value
if let Some(next_output) = this.queued_outputs.peek_mut() {
if next_output.index == this.next_outgoing_index {
this.next_outgoing_index += 1;
return Poll::Ready(Some(PeekMut::pop(next_output).data));
}
}
let next_output = match this.peek_wrapper() {
Some(_) => this.queued_outputs.peek_mut(),
// otherwise poll for it
None => match ready!(Pin::new(&mut *this).poll_peek_wrapper(cx)) {
Some(_) => this.queued_outputs.peek_mut(),
None => None,
},
};

loop {
match ready!(this.in_progress_queue.poll_next_unpin(cx)) {
Some(output) => {
if output.index == this.next_outgoing_index {
this.next_outgoing_index += 1;
return Poll::Ready(Some(output.data));
} else {
this.queued_outputs.push(output)
}
}
None => return Poll::Ready(None),
}
if let Some(next_output) = next_output {
this.next_outgoing_index += 1;
Poll::Ready(Some(PeekMut::pop(next_output).data))
} else {
Poll::Ready(None)
}
}

Expand Down
45 changes: 31 additions & 14 deletions futures-util/src/stream/stream/buffer_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use core::fmt;
use core::num::NonZeroUsize;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
Expand Down Expand Up @@ -51,16 +52,15 @@ where
}

delegate_access_inner!(stream, St, (.));
}

impl<St> Stream for BufferUnordered<St>
where
St: Stream,
St::Item: Future,
{
type Item = <St::Item as Future>::Output;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
/// Poll the underlying `Stream`, allowing the buffer to fill up.
///
/// This does not poll the futures produced by the stream,
/// `Stream::poll_next` should be used to progress instead.
///
/// When `Poll::Ready` is returned, the underlying stream has been
/// exhausted, and all of its futures have been buffered or consumed.
pub fn poll_stream(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();

// First up, try to spawn off as many futures as possible by filling up
Expand All @@ -72,18 +72,35 @@ where
}
}

if this.stream.is_done() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}

impl<St> Stream for BufferUnordered<St>
where
St: Stream,
St::Item: Future,
{
type Item = <St::Item as Future>::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let stream_res = self.as_mut().poll_stream(cx);

let this = self.project();

// Attempt to pull the next value from the in_progress_queue
match this.in_progress_queue.poll_next_unpin(cx) {
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
}

// If more values are still coming from the stream, we're not done yet
if this.stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
}
ready!(stream_res);
Poll::Ready(None)
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down
57 changes: 44 additions & 13 deletions futures-util/src/stream/stream/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,42 @@ where
}

delegate_access_inner!(stream, St, (.));

/// Fill the buffer as much as is allowed by polling the underlying `Stream`.
///
/// Returns `false` if there are no more futures and the `Stream::is_terminated()`.
/// Otherwise there may be more futures, but the buffer is out of room.
fn poll_fill(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
let mut this = self.project();

while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(fut) => this.in_progress_queue.push_back(fut),
None => break,
}
}

Poll::Ready(!this.stream.is_done())
}

/// Poll the buffered `Stream`, allowing it to progress as long as there is
/// room in the buffer.
///
/// This will also poll any futures produced by the stream, but only polls
/// the underlying `Stream` as long as the buffer can hold more entries.
/// `Stream::poll_next` should be used to progress to completion.
///
/// When `Poll::Ready` is returned, the underlying stream has been
/// exhausted, and all of its futures have been run to completion.
pub fn poll_stream(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let stream_res = self.as_mut().poll_fill(cx);

let this = self.project();
let queue_res = Pin::new(&mut *this.in_progress_queue).poll_all(cx);

ready!(stream_res);
queue_res
}
}

impl<St> Stream for Buffered<St>
Expand All @@ -62,17 +98,12 @@ where
{
type Item = <St::Item as Future>::Output;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// First up, try to spawn off as many futures as possible by filling up
// our queue of futures.
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
let stream_res = self.as_mut().poll_fill(cx);

let this = self.project();

// Attempt to pull the next value from the in_progress_queue
let res = this.in_progress_queue.poll_next_unpin(cx);
Expand All @@ -81,10 +112,10 @@ where
}

// If more values are still coming from the stream, we're not done yet
if this.stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
match stream_res {
Poll::Pending => Poll::Pending,
Poll::Ready(false) => Poll::Ready(None),
Poll::Ready(true) => panic!("buffer is full, but we have no values???"),
}
}

Expand Down