forked from rust-lang/futures-rs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
try_flatten_unordered.rs
133 lines (111 loc) · 4.05 KB
/
try_flatten_unordered.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use core::pin::Pin;
use futures_core::ready;
use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project_lite::pin_project;
use crate::future::Either;
use crate::stream::stream::FlattenUnordered;
use crate::StreamExt;
use super::IntoStream;
delegate_all!(
/// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
TryFlattenUnordered<St>(
FlattenUnordered<TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
+ New[
|stream: St, limit: impl Into<Option<usize>>|
TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams::new(stream).flatten_unordered(limit)
]
where
St: TryStream,
St::Ok: TryStream,
St::Ok: Unpin,
<St::Ok as TryStream>::Error: From<St::Error>
);
pin_project! {
/// Emits either successful streams or single-item streams containing the underlying errors.
/// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
where
St: TryStream,
St::Ok: TryStream,
St::Ok: Unpin,
<St::Ok as TryStream>::Error: From<St::Error>
{
#[pin]
stream: St,
}
}
impl<St> TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
where
St: TryStream,
St::Ok: TryStream + Unpin,
<St::Ok as TryStream>::Error: From<St::Error>,
{
fn new(stream: St) -> Self {
Self { stream }
}
delegate_access_inner!(stream, St, ());
}
impl<St> FusedStream for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
where
St: TryStream + FusedStream,
St::Ok: TryStream + Unpin,
<St::Ok as TryStream>::Error: From<St::Error>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}
/// Emits single item immediately, then stream will be terminated.
#[derive(Debug, Clone)]
pub struct Single<T>(Option<T>);
impl<T> Unpin for Single<T> {}
impl<T> Stream for Single<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.0.take())
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1)))
}
}
type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>;
impl<St> Stream for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
where
St: TryStream,
St::Ok: TryStream + Unpin,
<St::Ok as TryStream>::Error: From<St::Error>,
{
// Item is either an inner stream or a stream containing a single error.
// This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = ready!(self.project().stream.try_poll_next(cx));
let out = item.map(|res| match res {
// Emit successful inner stream as is
Ok(stream) => Either::Left(IntoStream::new(stream)),
// Wrap an error into a stream containing a single item
err @ Err(_) => {
let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);
Either::Right(Single(Some(res)))
}
});
Poll::Ready(out)
}
}
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<St, Item> Sink<Item> for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
where
St: TryStream + Sink<Item>,
St::Ok: TryStream + Unpin,
<St::Ok as TryStream>::Error: From<<St as TryStream>::Error>,
{
type Error = <St as Sink<Item>>::Error;
delegate_sink!(stream, Item);
}