Skip to content

Commit

Permalink
futures-util: add SwitchMap combinator
Browse files Browse the repository at this point in the history
  • Loading branch information
ranfdev committed Jan 8, 2022
1 parent 5406a5e commit 397e69f
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 1 deletion.
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold,
Skip, SkipWhile, StreamExt, StreamFuture, SwitchMap, Take, TakeUntil, TakeWhile, Then, TryFold,
TryForEach, Unzip, Zip,
};

Expand Down
34 changes: 34 additions & 0 deletions futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ delegate_all!(
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))]
);

mod switch_map;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::switch_map::SwitchMap;

mod next;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::next::Next;
Expand Down Expand Up @@ -452,6 +456,36 @@ pub trait StreamExt: Stream {
assert_stream::<T, _>(FilterMap::new(self, f))
}

/// Discards every value but the latest, maps it to a new stream and then returns
/// the items from the mapped stream.
/// When a new item comes from the root stream, the process is repeated.
///
/// # Examples
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter([1,3,5,7]);
///
/// // Maps each numbers to a stream of its multiples.
/// // Only the latest available item from `stream` is mapped.
/// let multiples = stream.switch_map(|x| {
/// stream::iter(1..)
/// .map(move |i| x*i).take(4)
/// });
///
/// assert_eq!(vec![7, 14, 21, 28], multiples.collect::<Vec<_>>().await);
/// # });
/// ```
fn switch_map<T, F>(self, f: F) -> SwitchMap<Self, T, F>
where
F: FnMut(Self::Item) -> T,
T: Stream,
Self: Sized,
{
assert_stream::<T::Item, _>(SwitchMap::new(self, f))
}

/// Computes from this stream's items new items of a different type using
/// an asynchronous closure.
///
Expand Down
86 changes: 86 additions & 0 deletions futures-util/src/stream/stream/switch_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use core::fmt;
use core::pin::Pin;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project_lite::pin_project;

use crate::fns::FnMut1;

pin_project! {
/// Stream for the [`switch_map`](super::StreamExt::switch_map) method.
#[must_use = "streams do nothing unless polled"]
pub struct SwitchMap<St, T, F> {
#[pin]
stream: St,
#[pin]
mapped_stream: Option<T>,
f: F
}
}

impl<St, T, F> fmt::Debug for SwitchMap<St, T, F>
where
St: fmt::Debug,
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SwitchMap")
.field("stream", &self.stream)
.field("mapped_stream", &self.mapped_stream)
.finish()
}
}

impl<St, T, F> SwitchMap<St, T, F>
where
St: Stream,
{
pub(crate) fn new(stream: St, f: F) -> Self {
Self { stream, mapped_stream: None, f }
}

delegate_access_inner!(stream, St, ());
}

impl<St, T, F> Stream for SwitchMap<St, T, F>
where
St: Stream,
T: Stream,
F: FnMut1<St::Item, Output = T>,
{
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let source_pending = loop {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(data)) => {
this.mapped_stream.set(Some(this.f.call_mut(data)));
}
Poll::Pending => break true,
_ => break false,
}
};
let mapped_poll = this.mapped_stream.as_pin_mut().map(|stream| stream.poll_next(cx));
match (source_pending, mapped_poll) {
(_, Some(Poll::Ready(Some(s)))) => Poll::Ready(Some(s)),
(false, None) | (false, Some(Poll::Ready(None))) => Poll::Ready(None),
_ => Poll::Pending,
}
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<St, T, F> Sink<T> for SwitchMap<St, T, F>
where
St: Stream + Sink<T>,
T: Stream,
F: FnMut1<St::Item>,
{
type Error = St::Error;

delegate_sink!(stream, T);
}

0 comments on commit 397e69f

Please sign in to comment.