Skip to content

Commit

Permalink
Generalize the various chunk methods to produce any collection that i…
Browse files Browse the repository at this point in the history
…mplements Default + Extend.

Fixes rust-lang#2492
  • Loading branch information
khuey committed Aug 27, 2022
1 parent 42bd7a6 commit 82a7dee
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 129 deletions.
47 changes: 22 additions & 25 deletions futures-util/src/stream/stream/chunks.rs
@@ -1,5 +1,5 @@
use crate::stream::Fuse;
use alloc::vec::Vec;
use core::cmp::min;
use core::mem;
use core::pin::Pin;
use futures_core::ready;
Expand All @@ -13,38 +13,39 @@ pin_project! {
/// Stream for the [`chunks`](super::StreamExt::chunks) method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Chunks<St: Stream> {
pub struct Chunks<St: Stream, C> {
#[pin]
stream: Fuse<St>,
items: Vec<St::Item>,
items: C,
len: usize,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}

impl<St: Stream> Chunks<St>
where
St: Stream,
{
impl<St: Stream, C: Default> Chunks<St, C> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);

Self {
stream: super::Fuse::new(stream),
items: Vec::with_capacity(capacity),
// Would be better if there were a trait for `with_capacity` and `len`.
items: C::default(),
len: 0,
cap: capacity,
}
}

fn take(self: Pin<&mut Self>) -> Vec<St::Item> {
let cap = self.cap;
mem::replace(self.project().items, Vec::with_capacity(cap))
fn take(mut self: Pin<&mut Self>) -> C {
let this = self.as_mut().project();
*this.len = 0;
mem::take(this.items)
}

delegate_access_inner!(stream, St, (.));
}

impl<St: Stream> Stream for Chunks<St> {
type Item = Vec<St::Item>;
impl<St: Stream, C: Default + Extend<<St as Stream>::Item>> Stream for Chunks<St, C> {
type Item = C;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
Expand All @@ -54,21 +55,17 @@ impl<St: Stream> Stream for Chunks<St> {
// If so, replace our buffer with a new and empty one and return
// the full one.
Some(item) => {
this.items.push(item);
if this.items.len() >= *this.cap {
this.items.extend(Some(item));
*this.len += 1;
if this.len >= this.cap {
return Poll::Ready(Some(self.take()));
}
}

// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
None => {
let last = if this.items.is_empty() {
None
} else {
let full_buf = mem::take(this.items);
Some(full_buf)
};
let last = if *this.len == 0 { None } else { Some(self.take()) };

return Poll::Ready(last);
}
Expand All @@ -77,7 +74,7 @@ impl<St: Stream> Stream for Chunks<St> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let chunk_len = min(self.len, 1);
let (lower, upper) = self.stream.size_hint();
let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Expand All @@ -88,15 +85,15 @@ impl<St: Stream> Stream for Chunks<St> {
}
}

impl<St: FusedStream> FusedStream for Chunks<St> {
impl<St: FusedStream, C: Default + Extend<<St as Stream>::Item>> FusedStream for Chunks<St, C> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.items.is_empty()
self.stream.is_terminated() && self.len == 0
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for Chunks<S>
impl<S, C, Item> Sink<Item> for Chunks<S, C>
where
S: Stream + Sink<Item>,
{
Expand Down
48 changes: 30 additions & 18 deletions futures-util/src/stream/stream/mod.rs
Expand Up @@ -7,8 +7,6 @@ use crate::future::{assert_future, Either};
use crate::stream::assert_stream;
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::pin::Pin;
#[cfg(feature = "alloc")]
use futures_core::stream::{BoxStream, LocalBoxStream};
Expand Down Expand Up @@ -1647,43 +1645,57 @@ pub trait StreamExt: Stream {
assert_stream::<Self::Item, _>(Peekable::new(self))
}

/// An adaptor for chunking up items of the stream inside a vector.
/// An adaptor for chunking up items of the stream inside a collection.
///
/// This combinator will attempt to pull items from this stream and buffer
/// them into a local vector. At most `capacity` items will get buffered
/// them into a local collection. At most `capacity` items will get buffered
/// before they're yielded from the returned stream.
///
/// Note that the vectors returned from this iterator may not always have
/// `capacity` elements. If the underlying stream ended and only a partial
/// vector was created, it'll be returned. Additionally if an error happens
/// from the underlying stream then the currently buffered items will be
/// yielded.
/// Note that the collections returned from this iterator may not always
/// have `capacity` elements. If the underlying stream ended and only a
/// partial collection was created, it'll be returned. Furthermore, some
/// collections may replace elements (e.g. a HashMap if it receives two
/// elements with identical keys).
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// # Panics
///
/// This method will panic if `capacity` is zero.
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(vec![1, 2, 3]);
///
/// let result: Vec<Vec<_>> = stream.chunks::<Vec<_>>(2).collect::<Vec<_>>().await;
/// assert_eq!(result, vec![
/// vec![1, 2],
/// vec![3],
/// ]);
/// # });
/// ```
#[cfg(feature = "alloc")]
fn chunks(self, capacity: usize) -> Chunks<Self>
fn chunks<C: Default + Extend<Self::Item>>(self, capacity: usize) -> Chunks<Self, C>
where
Self: Sized,
{
assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity))
assert_stream::<C, _>(Chunks::new(self, capacity))
}

/// An adaptor for chunking up ready items of the stream inside a vector.
/// An adaptor for chunking up ready items of the stream inside a collection.
///
/// This combinator will attempt to pull ready items from this stream and
/// buffer them into a local vector. At most `capacity` items will get
/// buffer them into a local collection. At most `capacity` items will get
/// buffered before they're yielded from the returned stream. If underlying
/// stream returns `Poll::Pending`, and collected chunk is not empty, it will
/// be immediately returned.
///
/// If the underlying stream ended and only a partial vector was created,
/// it'll be returned. Additionally if an error happens from the underlying
/// stream then the currently buffered items will be yielded.
/// If the underlying stream ended and only a partial collection was created,
/// it'll be returned. The same caveats for collections that can be replaced
/// that apply to `chunks` also apply to `ready_chunks`.
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
Expand All @@ -1692,11 +1704,11 @@ pub trait StreamExt: Stream {
///
/// This method will panic if `capacity` is zero.
#[cfg(feature = "alloc")]
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
fn ready_chunks<C: Default + Extend<Self::Item>>(self, capacity: usize) -> ReadyChunks<Self, C>
where
Self: Sized,
{
assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity))
assert_stream::<C, _>(ReadyChunks::new(self, capacity))
}

/// A future that completes after the given stream has been fully processed
Expand Down
61 changes: 31 additions & 30 deletions futures-util/src/stream/stream/ready_chunks.rs
@@ -1,5 +1,5 @@
use crate::stream::Fuse;
use alloc::vec::Vec;
use core::cmp::min;
use core::mem;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
Expand All @@ -12,71 +12,70 @@ pin_project! {
/// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct ReadyChunks<St: Stream> {
pub struct ReadyChunks<St: Stream, C> {
#[pin]
stream: Fuse<St>,
items: Vec<St::Item>,
items: C,
len: usize,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}

impl<St: Stream> ReadyChunks<St>
where
St: Stream,
{
impl<St: Stream, C: Default> ReadyChunks<St, C> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);

Self {
stream: super::Fuse::new(stream),
items: Vec::with_capacity(capacity),
// Would be better if there were a trait for `with_capacity` and `len`.
items: C::default(),
len: 0,
cap: capacity,
}
}

fn take(mut self: Pin<&mut Self>) -> C {
let this = self.as_mut().project();
*this.len = 0;
mem::take(this.items)
}

delegate_access_inner!(stream, St, (.));
}

impl<St: Stream> Stream for ReadyChunks<St> {
type Item = Vec<St::Item>;
impl<St: Stream, C: Default + Extend<<St as Stream>::Item>> Stream for ReadyChunks<St, C> {
type Item = C;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();

loop {
match this.stream.as_mut().poll_next(cx) {
// Flush all collected data if underlying stream doesn't contain
// more ready values
Poll::Pending => {
return if this.items.is_empty() {
return if *this.len == 0 {
Poll::Pending
} else {
Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap))))
Poll::Ready(Some(self.take()))
}
}

// Push the ready item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Poll::Ready(Some(item)) => {
this.items.push(item);
if this.items.len() >= *this.cap {
return Poll::Ready(Some(mem::replace(
this.items,
Vec::with_capacity(*this.cap),
)));
this.items.extend(Some(item));
*this.len += 1;
if *this.len >= *this.cap {
return Poll::Ready(Some(self.take()));
}
}

// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
Poll::Ready(None) => {
let last = if this.items.is_empty() {
None
} else {
let full_buf = mem::take(this.items);
Some(full_buf)
};
let last = if *this.len == 0 { None } else { Some(self.take()) };

return Poll::Ready(last);
}
Expand All @@ -85,7 +84,7 @@ impl<St: Stream> Stream for ReadyChunks<St> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let chunk_len = min(self.len, 1);
let (lower, upper) = self.stream.size_hint();
let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Expand All @@ -96,15 +95,17 @@ impl<St: Stream> Stream for ReadyChunks<St> {
}
}

impl<St: FusedStream> FusedStream for ReadyChunks<St> {
impl<St: FusedStream, C: Default + Extend<<St as Stream>::Item>> FusedStream
for ReadyChunks<St, C>
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.items.is_empty()
self.stream.is_terminated() && self.len == 0
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for ReadyChunks<S>
impl<S, C, Item> Sink<Item> for ReadyChunks<S, C>
where
S: Stream + Sink<Item>,
{
Expand Down
20 changes: 10 additions & 10 deletions futures-util/src/stream/try_stream/mod.rs
Expand Up @@ -11,8 +11,6 @@ use crate::fns::{
};
use crate::future::assert_future;
use crate::stream::{assert_stream, Inspect, Map};
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::pin::Pin;

use futures_core::{
Expand Down Expand Up @@ -510,15 +508,17 @@ pub trait TryStreamExt: TryStream {
assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
}

/// An adaptor for chunking up successful items of the stream inside a vector.
/// An adaptor for chunking up successful items of the stream inside a collection.
///
/// This combinator will attempt to pull successful items from this stream and buffer
/// them into a local vector. At most `capacity` items will get buffered
/// them into a local collection. At most `capacity` items will get buffered
/// before they're yielded from the returned stream.
///
/// Note that the vectors returned from this iterator may not always have
/// Note that the collections returned from this iterator may not always have
/// `capacity` elements. If the underlying stream ended and only a partial
/// vector was created, it'll be returned. Additionally if an error happens
/// collection was created, it'll be returned. Furthermore, some
/// collections may replace elements (e.g. a HashMap if it receives two
/// elements with identical keys). Additionally if an error happens
/// from the underlying stream then the currently buffered items will be
/// yielded.
///
Expand Down Expand Up @@ -548,13 +548,13 @@ pub trait TryStreamExt: TryStream {
///
/// This method will panic if `capacity` is zero.
#[cfg(feature = "alloc")]
fn try_chunks(self, capacity: usize) -> TryChunks<Self>
fn try_chunks<C: Default + Extend<Self::Ok>>(self, capacity: usize) -> TryChunks<Self, C>
where
Self: Sized,
{
assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
TryChunks::new(self, capacity),
)
assert_stream::<Result<C, TryChunksError<C, Self::Error>>, _>(TryChunks::new(
self, capacity,
))
}

/// Attempt to filter the values produced by this stream according to the
Expand Down

0 comments on commit 82a7dee

Please sign in to comment.