From 4a1fff1d884bd411c84811b29d6882c0cfd16f58 Mon Sep 17 00:00:00 2001 From: olegnn Date: Sun, 17 Apr 2022 13:10:05 +0400 Subject: [PATCH 1/3] Several fixes --- futures-util/src/stream/stream/flatten_unordered.rs | 2 +- futures-util/src/stream/try_stream/try_flatten_unordered.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 66ba4d0d55..de425346b5 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -137,7 +137,7 @@ impl SharedPollState { fn stop_waking(&self) -> u8 { self.state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - let next_value = value & !WAKING; + let next_value = value & !WAKING | WOKEN; if next_value != value { Some(next_value) diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index aaad910bf0..62b77b4926 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -121,10 +121,10 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams +impl Sink for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams where St: TryStream + Sink, - St::Ok: Stream> + Unpin, + St::Ok: TryStream + Unpin, ::Error: From<::Error>, { type Error = >::Error; From 835af2951dfd50df1b36e3e6564c3ff6f31703fa Mon Sep 17 00:00:00 2001 From: olegnn Date: Sun, 17 Apr 2022 17:06:48 +0400 Subject: [PATCH 2/3] Spacing --- futures-util/src/stream/try_stream/try_flatten_unordered.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index 62b77b4926..e21b514023 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -27,7 +27,7 @@ delegate_all!( St: TryStream, St::Ok: TryStream, St::Ok: Unpin, - ::Error: From + ::Error: From ); pin_project! { @@ -40,7 +40,7 @@ pin_project! { St: TryStream, St::Ok: TryStream, St::Ok: Unpin, - ::Error: From + ::Error: From { #[pin] stream: St, From f943d88e07bd8b649b01d9c96eff8c92834f4ba8 Mon Sep 17 00:00:00 2001 From: olegnn Date: Mon, 18 Apr 2022 12:31:09 +0400 Subject: [PATCH 3/3] Simplifications, `debug` assertions instead of runtime checks --- .../src/stream/stream/flatten_unordered.rs | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index de425346b5..8293983d84 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -56,14 +56,14 @@ impl SharedPollState { } /// Attempts to start polling, returning stored state in case of success. - /// Returns `None` if some waker is waking at the moment. + /// Returns `None` if either waker is waking at the moment or state is empty. fn start_polling( &self, ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { let value = self .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - if value & WAKING == NONE { + if value & WAKING == NONE && value & NEED_TO_POLL_ALL != NONE { Some(POLLING) } else { None @@ -99,8 +99,10 @@ impl SharedPollState { }) .ok()?; + debug_assert!(value & WAKING == NONE); + // Only start the waking process if we're not in the polling phase and the stream isn't woken already - if value & (WOKEN | POLLING | WAKING) == NONE { + if value & (WOKEN | POLLING) == NONE { let bomb = PollStateBomb::new(self, SharedPollState::stop_waking); Some((value, bomb)) @@ -135,7 +137,8 @@ impl SharedPollState { /// Toggles state to non-waking, allowing to start polling. fn stop_waking(&self) -> u8 { - self.state + let value = self + .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { let next_value = value & !WAKING | WOKEN; @@ -145,7 +148,10 @@ impl SharedPollState { None } }) - .unwrap_or_else(identity) + .unwrap_or_else(identity); + + debug_assert!(value & (WOKEN | POLLING | WAKING) == WAKING); + value } /// Resets current state allowing to poll the stream and wake up wakers. @@ -170,11 +176,6 @@ impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> { fn deactivate(mut self) { self.drop.take(); } - - /// Manually fires the bomb, returning supplied state. - fn fire(mut self) -> Option { - self.drop.take().map(|drop| (drop)(self.state)) - } } impl u8> Drop for PollStateBomb<'_, F> { @@ -225,13 +226,10 @@ impl ArcWake for WrappedWaker { if let Some(inner_waker) = waker_opt.clone() { // Stop waking to allow polling stream - let poll_state_value = state_bomb.fire().unwrap(); + drop(state_bomb); - // We want to call waker only if the stream isn't woken yet - if poll_state_value & (WOKEN | WAKING) == WAKING { - // Wake up inner waker - inner_waker.wake(); - } + // Wake up inner waker + inner_waker.wake(); } } }