From 59a6dfec3cb421967870771b441a68ae64381c72 Mon Sep 17 00:00:00 2001 From: b-naber Date: Wed, 27 Jul 2022 11:35:57 +0200 Subject: [PATCH] address review --- tokio/src/sync/mpsc/bounded.rs | 4 --- tokio/src/sync/mpsc/chan.rs | 2 +- tokio/tests/sync_mpsc.rs | 52 +++++++++++++--------------------- 3 files changed, 21 insertions(+), 37 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 015f01aedf5..a62952c51e8 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -52,7 +52,6 @@ pub struct Sender { /// drop(tx); /// assert!(tx_weak.clone().upgrade().is_none()); /// } -/// /// ``` pub struct WeakSender { chan: Arc>, @@ -1033,9 +1032,6 @@ impl Sender { /// channel were dropped and only `WeakSender` instances remain, /// the channel is closed. pub fn downgrade(&self) -> WeakSender { - // Note: If this is the last `Sender` instance we want to close the - // channel when downgrading, so it's important to move into `self` here. - WeakSender { chan: self.chan.downgrade(), } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index df511916c64..a10ffb7d797 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -47,7 +47,7 @@ pub(crate) trait Semaphore { fn is_closed(&self) -> bool; } -pub(crate) struct Chan { +pub(super) struct Chan { /// Notifies all tasks listening for the receiver being dropped. notify_rx_closed: Notify, diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index c4b9baf01df..2dd0f6f57b6 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -10,17 +10,14 @@ use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; #[cfg(not(all(target_arch = "wasm32", not(target_os = "wasi"))))] use tokio::test as maybe_tokio_test; -use tokio::join; use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; use tokio::sync::mpsc::{self, channel}; use tokio::sync::oneshot; -use tokio::time; use tokio_test::*; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Acquire, Release}; use std::sync::Arc; -use std::time::Duration; #[cfg(not(target_arch = "wasm32"))] mod support { @@ -838,47 +835,36 @@ impl Drop for Msg { // `Sender` was dropped while more than one `WeakSender` remains, we want to // ensure that no messages are kept in the channel, which were sent after // the receiver was dropped. -#[tokio::test(start_paused = true)] +#[tokio::test] async fn test_msgs_dropped_on_rx_drop() { - fn ms(millis: u64) -> Duration { - Duration::from_millis(millis) - } - let (tx, mut rx) = mpsc::channel(3); - let rx_handle = tokio::spawn(async move { - let _ = rx.recv().await.unwrap(); - let _ = rx.recv().await.unwrap(); - time::sleep(ms(1)).await; - drop(rx); + let _ = tx.send(Msg {}).await.unwrap(); + let _ = tx.send(Msg {}).await.unwrap(); - time::advance(ms(1)).await; - }); + // This msg will be pending and should be dropped when `rx` is dropped + let sent_fut = tx.send(Msg {}); - let tx_handle = tokio::spawn(async move { - let _ = tx.send(Msg {}).await.unwrap(); - let _ = tx.send(Msg {}).await.unwrap(); + let _ = rx.recv().await.unwrap(); + let _ = rx.recv().await.unwrap(); - // This msg will be pending and should be dropped when `rx` is dropped - let _ = tx.send(Msg {}).await.unwrap(); - time::advance(ms(1)).await; + let _ = sent_fut.await.unwrap(); - // This msg will not be put onto `Tx` list anymore, since `Rx` is closed. - time::sleep(ms(1)).await; - assert!(tx.send(Msg {}).await.is_err()); + drop(rx); - // Ensure that third message isn't put onto the channel anymore - assert_eq!(NUM_DROPPED.load(Acquire), 4); - }); + assert_eq!(NUM_DROPPED.load(Acquire), 3); - let (_, _) = join!(rx_handle, tx_handle); + // This msg will not be put onto `Tx` list anymore, since `Rx` is closed. + assert!(tx.send(Msg {}).await.is_err()); + + assert_eq!(NUM_DROPPED.load(Acquire), 4); } // Tests that a `WeakSender` is upgradeable when other `Sender`s exist. #[tokio::test] async fn downgrade_upgrade_sender_success() { let (tx, _rx) = mpsc::channel::(1); - let weak_tx = tx.clone().downgrade(); + let weak_tx = tx.downgrade(); assert!(weak_tx.upgrade().is_some()); } @@ -897,6 +883,7 @@ async fn downgrade_upgrade_sender_failure() { async fn downgrade_drop_upgrade() { let (tx, _rx) = mpsc::channel::(1); + // the cloned `Tx` is dropped right away let weak_tx = tx.clone().downgrade(); drop(tx); assert!(weak_tx.upgrade().is_none()); @@ -907,7 +894,7 @@ async fn downgrade_drop_upgrade() { #[tokio::test] async fn downgrade_get_permit_upgrade_no_senders() { let (tx, _rx) = mpsc::channel::(1); - let weak_tx = tx.clone().downgrade(); + let weak_tx = tx.downgrade(); let _permit = tx.reserve_owned().await.unwrap(); assert!(weak_tx.upgrade().is_some()); } @@ -920,12 +907,13 @@ async fn downgrade_upgrade_get_permit_no_senders() { let tx2 = tx.clone(); let _permit = tx.reserve_owned().await.unwrap(); let weak_tx = tx2.downgrade(); + drop(tx2); assert!(weak_tx.upgrade().is_some()); } -// Tests that `Clone` of `WeakSender` doesn't decrement `tx_count`. +// Tests that `downgrade` does not change the `tx_count` of the channel. #[tokio::test] -async fn test_weak_sender_clone() { +async fn test_tx_count_weak_sender() { let (tx, _rx) = mpsc::channel::(1); let tx_weak = tx.downgrade(); let tx_weak2 = tx.downgrade();