Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize the various chunk methods to produce any collection that implements Default + Extend #2498

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 22 additions & 25 deletions futures-util/src/stream/stream/chunks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::stream::Fuse;
use alloc::vec::Vec;
use core::cmp::min;
use core::mem;
use core::pin::Pin;
use futures_core::ready;
Expand All @@ -13,38 +13,39 @@ pin_project! {
/// Stream for the [`chunks`](super::StreamExt::chunks) method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Chunks<St: Stream> {
pub struct Chunks<St: Stream, C> {
#[pin]
stream: Fuse<St>,
items: Vec<St::Item>,
items: C,
len: usize,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}

impl<St: Stream> Chunks<St>
where
St: Stream,
{
impl<St: Stream, C: Default> Chunks<St, C> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);

Self {
stream: super::Fuse::new(stream),
items: Vec::with_capacity(capacity),
// Would be better if there were a trait for `with_capacity` and `len`.
items: C::default(),
len: 0,
cap: capacity,
}
}

fn take(self: Pin<&mut Self>) -> Vec<St::Item> {
let cap = self.cap;
mem::replace(self.project().items, Vec::with_capacity(cap))
fn take(mut self: Pin<&mut Self>) -> C {
let this = self.as_mut().project();
*this.len = 0;
mem::take(this.items)
}

delegate_access_inner!(stream, St, (.));
}

impl<St: Stream> Stream for Chunks<St> {
type Item = Vec<St::Item>;
impl<St: Stream, C: Default + Extend<<St as Stream>::Item>> Stream for Chunks<St, C> {
type Item = C;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
Expand All @@ -54,21 +55,17 @@ impl<St: Stream> Stream for Chunks<St> {
// If so, replace our buffer with a new and empty one and return
// the full one.
Some(item) => {
this.items.push(item);
if this.items.len() >= *this.cap {
this.items.extend(Some(item));
*this.len += 1;
if this.len >= this.cap {
return Poll::Ready(Some(self.take()));
}
}

// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
None => {
let last = if this.items.is_empty() {
None
} else {
let full_buf = mem::take(this.items);
Some(full_buf)
};
let last = if *this.len == 0 { None } else { Some(self.take()) };

return Poll::Ready(last);
}
Expand All @@ -77,7 +74,7 @@ impl<St: Stream> Stream for Chunks<St> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let chunk_len = min(self.len, 1);
let (lower, upper) = self.stream.size_hint();
let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Expand All @@ -88,15 +85,15 @@ impl<St: Stream> Stream for Chunks<St> {
}
}

impl<St: FusedStream> FusedStream for Chunks<St> {
impl<St: FusedStream, C: Default + Extend<<St as Stream>::Item>> FusedStream for Chunks<St, C> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.items.is_empty()
self.stream.is_terminated() && self.len == 0
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for Chunks<S>
impl<S, C, Item> Sink<Item> for Chunks<S, C>
where
S: Stream + Sink<Item>,
{
Expand Down
48 changes: 30 additions & 18 deletions futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use crate::future::{assert_future, Either};
use crate::stream::assert_stream;
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::pin::Pin;
#[cfg(feature = "alloc")]
use futures_core::stream::{BoxStream, LocalBoxStream};
Expand Down Expand Up @@ -1647,43 +1645,57 @@ pub trait StreamExt: Stream {
assert_stream::<Self::Item, _>(Peekable::new(self))
}

/// An adaptor for chunking up items of the stream inside a vector.
/// An adaptor for chunking up items of the stream inside a collection.
///
/// This combinator will attempt to pull items from this stream and buffer
/// them into a local vector. At most `capacity` items will get buffered
/// them into a local collection. At most `capacity` items will get buffered
/// before they're yielded from the returned stream.
///
/// Note that the vectors returned from this iterator may not always have
/// `capacity` elements. If the underlying stream ended and only a partial
/// vector was created, it'll be returned. Additionally if an error happens
/// from the underlying stream then the currently buffered items will be
/// yielded.
/// Note that the collections returned from this iterator may not always
/// have `capacity` elements. If the underlying stream ended and only a
/// partial collection was created, it'll be returned. Furthermore, some
/// collections may replace elements (e.g. a HashMap if it receives two
/// elements with identical keys).
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// # Panics
///
/// This method will panic if `capacity` is zero.
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(vec![1, 2, 3]);
///
/// let result: Vec<Vec<_>> = stream.chunks::<Vec<_>>(2).collect::<Vec<_>>().await;
/// assert_eq!(result, vec![
/// vec![1, 2],
/// vec![3],
/// ]);
/// # });
/// ```
#[cfg(feature = "alloc")]
fn chunks(self, capacity: usize) -> Chunks<Self>
fn chunks<C: Default + Extend<Self::Item>>(self, capacity: usize) -> Chunks<Self, C>
where
Self: Sized,
{
assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity))
assert_stream::<C, _>(Chunks::new(self, capacity))
}

/// An adaptor for chunking up ready items of the stream inside a vector.
/// An adaptor for chunking up ready items of the stream inside a collection.
///
/// This combinator will attempt to pull ready items from this stream and
/// buffer them into a local vector. At most `capacity` items will get
/// buffer them into a local collection. At most `capacity` items will get
/// buffered before they're yielded from the returned stream. If underlying
/// stream returns `Poll::Pending`, and collected chunk is not empty, it will
/// be immediately returned.
///
/// If the underlying stream ended and only a partial vector was created,
/// it'll be returned. Additionally if an error happens from the underlying
/// stream then the currently buffered items will be yielded.
/// If the underlying stream ended and only a partial collection was created,
/// it'll be returned. The same caveats for collections that can be replaced
/// that apply to `chunks` also apply to `ready_chunks`.
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
Expand All @@ -1692,11 +1704,11 @@ pub trait StreamExt: Stream {
///
/// This method will panic if `capacity` is zero.
#[cfg(feature = "alloc")]
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
fn ready_chunks<C: Default + Extend<Self::Item>>(self, capacity: usize) -> ReadyChunks<Self, C>
where
Self: Sized,
{
assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity))
assert_stream::<C, _>(ReadyChunks::new(self, capacity))
}

/// A future that completes after the given stream has been fully processed
Expand Down
61 changes: 31 additions & 30 deletions futures-util/src/stream/stream/ready_chunks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::stream::Fuse;
use alloc::vec::Vec;
use core::cmp::min;
use core::mem;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
Expand All @@ -12,71 +12,70 @@ pin_project! {
/// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct ReadyChunks<St: Stream> {
pub struct ReadyChunks<St: Stream, C> {
#[pin]
stream: Fuse<St>,
items: Vec<St::Item>,
items: C,
len: usize,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}

impl<St: Stream> ReadyChunks<St>
where
St: Stream,
{
impl<St: Stream, C: Default> ReadyChunks<St, C> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);

Self {
stream: super::Fuse::new(stream),
items: Vec::with_capacity(capacity),
// Would be better if there were a trait for `with_capacity` and `len`.
items: C::default(),
len: 0,
cap: capacity,
}
}

fn take(mut self: Pin<&mut Self>) -> C {
let this = self.as_mut().project();
*this.len = 0;
mem::take(this.items)
}

delegate_access_inner!(stream, St, (.));
}

impl<St: Stream> Stream for ReadyChunks<St> {
type Item = Vec<St::Item>;
impl<St: Stream, C: Default + Extend<<St as Stream>::Item>> Stream for ReadyChunks<St, C> {
type Item = C;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();

loop {
match this.stream.as_mut().poll_next(cx) {
// Flush all collected data if underlying stream doesn't contain
// more ready values
Poll::Pending => {
return if this.items.is_empty() {
return if *this.len == 0 {
Poll::Pending
} else {
Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap))))
Poll::Ready(Some(self.take()))
}
}

// Push the ready item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Poll::Ready(Some(item)) => {
this.items.push(item);
if this.items.len() >= *this.cap {
return Poll::Ready(Some(mem::replace(
this.items,
Vec::with_capacity(*this.cap),
)));
this.items.extend(Some(item));
*this.len += 1;
if *this.len >= *this.cap {
return Poll::Ready(Some(self.take()));
}
}

// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
Poll::Ready(None) => {
let last = if this.items.is_empty() {
None
} else {
let full_buf = mem::take(this.items);
Some(full_buf)
};
let last = if *this.len == 0 { None } else { Some(self.take()) };

return Poll::Ready(last);
}
Expand All @@ -85,7 +84,7 @@ impl<St: Stream> Stream for ReadyChunks<St> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let chunk_len = min(self.len, 1);
let (lower, upper) = self.stream.size_hint();
let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Expand All @@ -96,15 +95,17 @@ impl<St: Stream> Stream for ReadyChunks<St> {
}
}

impl<St: FusedStream> FusedStream for ReadyChunks<St> {
impl<St: FusedStream, C: Default + Extend<<St as Stream>::Item>> FusedStream
for ReadyChunks<St, C>
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.items.is_empty()
self.stream.is_terminated() && self.len == 0
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for ReadyChunks<S>
impl<S, C, Item> Sink<Item> for ReadyChunks<S, C>
where
S: Stream + Sink<Item>,
{
Expand Down
20 changes: 10 additions & 10 deletions futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use crate::fns::{
};
use crate::future::assert_future;
use crate::stream::{assert_stream, Inspect, Map};
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::pin::Pin;

use futures_core::{
Expand Down Expand Up @@ -510,15 +508,17 @@ pub trait TryStreamExt: TryStream {
assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
}

/// An adaptor for chunking up successful items of the stream inside a vector.
/// An adaptor for chunking up successful items of the stream inside a collection.
///
/// This combinator will attempt to pull successful items from this stream and buffer
/// them into a local vector. At most `capacity` items will get buffered
/// them into a local collection. At most `capacity` items will get buffered
/// before they're yielded from the returned stream.
///
/// Note that the vectors returned from this iterator may not always have
/// Note that the collections returned from this iterator may not always have
/// `capacity` elements. If the underlying stream ended and only a partial
/// vector was created, it'll be returned. Additionally if an error happens
/// collection was created, it'll be returned. Furthermore, some
/// collections may replace elements (e.g. a HashMap if it receives two
/// elements with identical keys). Additionally if an error happens
/// from the underlying stream then the currently buffered items will be
/// yielded.
///
Expand Down Expand Up @@ -548,13 +548,13 @@ pub trait TryStreamExt: TryStream {
///
/// This method will panic if `capacity` is zero.
#[cfg(feature = "alloc")]
fn try_chunks(self, capacity: usize) -> TryChunks<Self>
fn try_chunks<C: Default + Extend<Self::Ok>>(self, capacity: usize) -> TryChunks<Self, C>
where
Self: Sized,
{
assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
TryChunks::new(self, capacity),
)
assert_stream::<Result<C, TryChunksError<C, Self::Error>>, _>(TryChunks::new(
self, capacity,
))
}

/// Attempt to filter the values produced by this stream according to the
Expand Down