Skip to content

Commit

Permalink
Expose stand-alone poll_stream() method instead
Browse files Browse the repository at this point in the history
  • Loading branch information
arcnmx committed Dec 18, 2022
1 parent 603637e commit 94b6155
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 97 deletions.
111 changes: 78 additions & 33 deletions futures-util/src/stream/futures_ordered.rs
@@ -1,4 +1,3 @@
use crate::future::FutureExt;
use crate::stream::{FuturesUnordered, StreamExt};
use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
use core::cmp::Ordering;
Expand Down Expand Up @@ -169,6 +168,68 @@ impl<Fut: Future> FuturesOrdered<Fut> {
self.in_progress_queue.push(wrapped);
}
}

/// Poll the contained futures in this queue.
///
/// `Stream::poll_next` must be used in order to retrieve the outputs.
///
/// `Poll::Ready` indicates that the underlying futures are all completed.
pub fn poll_all(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let this = &mut *self;

loop {
match ready!(this.in_progress_queue.poll_next_unpin(cx)) {
Some(output) => {
let index = output.index;
this.queued_outputs.push(output);
if index == this.next_outgoing_index {
// the Stream is now ready to be polled
cx.waker().wake_by_ref();
break Poll::Pending;
}
}
None => break Poll::Ready(()),
}
}
}

/// `Some` if an output is immediately available
fn peek_wrapper(&self) -> Option<()> {
match self.queued_outputs.peek() {
Some(next_output) if next_output.index == self.next_outgoing_index => Some(()),
_ => None,
}
}

/// `None` if `Stream::is_terminated`
fn poll_peek_wrapper(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
let res = self.as_mut().poll_all(cx);
let peek = self.peek_wrapper();

match res {
Poll::Pending => match peek {
None => Poll::Pending,
output => Poll::Ready(output),
},
Poll::Ready(()) => Poll::Ready(peek),
}
}

/// WIP: if a value is immediately available, borrow it
pub fn peek(&self) -> Option<&Fut::Output> {
let peek = self.peek_wrapper().map(drop);
peek.and_then(move |_| self.queued_outputs.peek()).map(|wrapper| &wrapper.data)
}

/// WIP: `None` indicates the end of the stream
pub fn poll_peek<'a>(
mut self: Pin<&'a mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<&'a Fut::Output>> {
let peek = ready!(self.as_mut().poll_peek_wrapper(cx));
let this = self.get_mut();
Poll::Ready(peek.and_then(move |_| this.queued_outputs.peek()).map(|wrapper| &wrapper.data))
}
}

impl<Fut: Future> Default for FuturesOrdered<Fut> {
Expand All @@ -180,18 +241,25 @@ impl<Fut: Future> Default for FuturesOrdered<Fut> {
impl<Fut: Future> Stream for FuturesOrdered<Fut> {
type Item = Fut::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

// Check to see if we've already received the next value
if let Some(next_output) = this.queued_outputs.peek_mut() {
if next_output.index == this.next_outgoing_index {
this.next_outgoing_index += 1;
return Poll::Ready(Some(PeekMut::pop(next_output).data));
}
}
let next_output = match this.peek_wrapper() {
Some(_) => this.queued_outputs.peek_mut(),
// otherwise poll for it
None => match ready!(Pin::new(&mut *this).poll_peek_wrapper(cx)) {
Some(_) => this.queued_outputs.peek_mut(),
None => None,
},
};

this.poll_unpin(cx).map(|()| None)
if let Some(next_output) = next_output {
this.next_outgoing_index += 1;
Poll::Ready(Some(PeekMut::pop(next_output).data))
} else {
Poll::Ready(None)
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand All @@ -200,29 +268,6 @@ impl<Fut: Future> Stream for FuturesOrdered<Fut> {
}
}

impl<Fut: Future> Future for FuturesOrdered<Fut> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;

loop {
match ready!(this.in_progress_queue.poll_next_unpin(cx)) {
Some(output) => {
let index = output.index;
this.queued_outputs.push(output);
if index == this.next_outgoing_index {
// the Stream is now ready to be polled
cx.waker().wake_by_ref();
break Poll::Pending;
}
}
None => break Poll::Ready(()),
}
}
}
}

impl<Fut: Future> Debug for FuturesOrdered<Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "FuturesOrdered {{ ... }}")
Expand Down
57 changes: 28 additions & 29 deletions futures-util/src/stream/stream/buffer_unordered.rs
Expand Up @@ -52,6 +52,32 @@ where
}

delegate_access_inner!(stream, St, (.));

/// Poll the underlying `Stream`, allowing the buffer to fill up.
///
/// This does not poll the futures produced by the stream,
/// `Stream::poll_next` should be used to progress instead.
///
/// When `Poll::Ready` is returned, the underlying stream has been
/// exhausted, and all of its futures have been buffered or consumed.
pub fn poll_stream(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();

// First up, try to spawn off as many futures as possible by filling up
// our queue of futures.
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}

if this.stream.is_done() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}

impl<St> Stream for BufferUnordered<St>
Expand All @@ -62,7 +88,7 @@ where
type Item = <St::Item as Future>::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let stream_res = self.as_mut().poll(cx);
let stream_res = self.as_mut().poll_stream(cx);

let this = self.project();

Expand All @@ -73,7 +99,7 @@ where
}

// If more values are still coming from the stream, we're not done yet
let () = ready!(stream_res);
ready!(stream_res);
Poll::Ready(None)
}

Expand All @@ -99,33 +125,6 @@ where
}
}

impl<St> Future for BufferUnordered<St>
where
St: Stream,
St::Item: Future,
{
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

// First up, try to spawn off as many futures as possible by filling up
// our queue of futures.
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}

if this.stream.is_done() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for BufferUnordered<S>
Expand Down
78 changes: 43 additions & 35 deletions futures-util/src/stream/stream/buffered.rs
@@ -1,4 +1,3 @@
use crate::future::FutureExt;
use crate::stream::{Fuse, FuturesOrdered, StreamExt};
use core::fmt;
use core::num::NonZeroUsize;
Expand Down Expand Up @@ -54,6 +53,42 @@ where
}

delegate_access_inner!(stream, St, (.));

/// Fill the buffer as much as is allowed by polling the underlying `Stream`.
///
/// Returns `false` if there are no more futures and the `Stream::is_terminated()`.
/// Otherwise there may be more futures, but the buffer is out of room.
fn poll_fill(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
let mut this = self.project();

while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(fut) => this.in_progress_queue.push_back(fut),
None => break,
}
}

Poll::Ready(!this.stream.is_done())
}

/// Poll the buffered `Stream`, allowing it to progress as long as there is
/// room in the buffer.
///
/// This will also poll any futures produced by the stream, but only polls
/// the underlying `Stream` as long as the buffer can hold more entries.
/// `Stream::poll_next` should be used to progress to completion.
///
/// When `Poll::Ready` is returned, the underlying stream has been
/// exhausted, and all of its futures have been run to completion.
pub fn poll_stream(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let stream_res = self.as_mut().poll_fill(cx);

let this = self.project();
let queue_res = Pin::new(&mut *this.in_progress_queue).poll_all(cx);

ready!(stream_res);
queue_res
}
}

impl<St> Stream for Buffered<St>
Expand All @@ -64,7 +99,9 @@ where
type Item = <St::Item as Future>::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let _stream_res = self.as_mut().poll(cx);
// First up, try to spawn off as many futures as possible by filling up
// our queue of futures.
let stream_res = self.as_mut().poll_fill(cx);

let this = self.project();

Expand All @@ -75,10 +112,10 @@ where
}

// If more values are still coming from the stream, we're not done yet
if this.stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
match stream_res {
Poll::Pending => Poll::Pending,
Poll::Ready(false) => Poll::Ready(None),
Poll::Ready(true) => panic!("buffer is full, but we have no values???"),
}
}

Expand All @@ -94,35 +131,6 @@ where
}
}

impl<St> Future for Buffered<St>
where
St: Stream,
St::Item: Future,
{
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

// First up, try to spawn off as many futures as possible by filling up
// our queue of futures.
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}

let queue_res = this.in_progress_queue.poll_unpin(cx);

if this.stream.is_done() {
queue_res
} else {
Poll::Pending
}
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for Buffered<S>
Expand Down

0 comments on commit 94b6155

Please sign in to comment.