diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index e5514277f6c..8babdc7e076 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -143,7 +143,10 @@ pub struct Receiver { #[track_caller] pub fn channel(buffer: usize) -> (Sender, Receiver) { assert!(buffer > 0, "mpsc bounded channel requires buffer > 0"); - let semaphore = (semaphore::Semaphore::new(buffer), buffer); + let semaphore = Semaphore { + semaphore: semaphore::Semaphore::new(buffer), + bound: buffer, + }; let (tx, rx) = chan::channel(semaphore); let tx = Sender::new(tx); @@ -154,7 +157,11 @@ pub fn channel(buffer: usize) -> (Sender, Receiver) { /// Channel semaphore is a tuple of the semaphore implementation and a `usize` /// representing the channel bound. -type Semaphore = (semaphore::Semaphore, usize); +#[derive(Debug)] +pub(crate) struct Semaphore { + pub(crate) semaphore: semaphore::Semaphore, + pub(crate) bound: usize, +} impl Receiver { pub(crate) fn new(chan: chan::Rx) -> Receiver { @@ -572,7 +579,7 @@ impl Sender { /// } /// ``` pub fn try_send(&self, message: T) -> Result<(), TrySendError> { - match self.chan.semaphore().0.try_acquire(1) { + match self.chan.semaphore().semaphore.try_acquire(1) { Ok(_) => {} Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)), Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)), @@ -852,7 +859,7 @@ impl Sender { } async fn reserve_inner(&self) -> Result<(), SendError<()>> { - match self.chan.semaphore().0.acquire(1).await { + match self.chan.semaphore().semaphore.acquire(1).await { Ok(_) => Ok(()), Err(_) => Err(SendError(())), } @@ -902,7 +909,7 @@ impl Sender { /// } /// ``` pub fn try_reserve(&self) -> Result, TrySendError<()>> { - match self.chan.semaphore().0.try_acquire(1) { + match self.chan.semaphore().semaphore.try_acquire(1) { Ok(_) => {} Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())), Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())), @@ -967,7 +974,7 @@ impl Sender { /// } /// ``` pub fn try_reserve_owned(self) -> Result, TrySendError> { - match self.chan.semaphore().0.try_acquire(1) { + match self.chan.semaphore().semaphore.try_acquire(1) { Ok(_) => {} Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)), Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)), @@ -1028,7 +1035,7 @@ impl Sender { /// [`channel`]: channel /// [`max_capacity`]: Sender::max_capacity pub fn capacity(&self) -> usize { - self.chan.semaphore().0.available_permits() + self.chan.semaphore().semaphore.available_permits() } /// Converts the `Sender` to a [`WeakSender`] that does not count @@ -1074,7 +1081,7 @@ impl Sender { /// [`max_capacity`]: Sender::max_capacity /// [`capacity`]: Sender::capacity pub fn max_capacity(&self) -> usize { - self.chan.semaphore().1 + self.chan.semaphore().bound } } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index f0e9dc27f33..076d925d62f 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -4,7 +4,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; use crate::park::thread::CachedParkThread; use crate::sync::mpsc::error::TryRecvError; -use crate::sync::mpsc::list; +use crate::sync::mpsc::{bounded, list, unbounded}; use crate::sync::notify::Notify; use std::fmt; @@ -12,7 +12,6 @@ use std::process; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; -use std::usize; /// Channel sender. pub(crate) struct Tx { @@ -382,29 +381,29 @@ impl Drop for Chan { // ===== impl Semaphore for (::Semaphore, capacity) ===== -impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) { +impl Semaphore for bounded::Semaphore { fn add_permit(&self) { - self.0.release(1) + self.semaphore.release(1) } fn is_idle(&self) -> bool { - self.0.available_permits() == self.1 + self.semaphore.available_permits() == self.bound } fn close(&self) { - self.0.close(); + self.semaphore.close(); } fn is_closed(&self) -> bool { - self.0.is_closed() + self.semaphore.is_closed() } } // ===== impl Semaphore for AtomicUsize ===== -impl Semaphore for AtomicUsize { +impl Semaphore for unbounded::Semaphore { fn add_permit(&self) { - let prev = self.fetch_sub(2, Release); + let prev = self.0.fetch_sub(2, Release); if prev >> 1 == 0 { // Something went wrong @@ -413,14 +412,14 @@ impl Semaphore for AtomicUsize { } fn is_idle(&self) -> bool { - self.load(Acquire) >> 1 == 0 + self.0.load(Acquire) >> 1 == 0 } fn close(&self) { - self.fetch_or(1, Release); + self.0.fetch_or(1, Release); } fn is_closed(&self) -> bool { - self.load(Acquire) & 1 == 1 + self.0.load(Acquire) & 1 == 1 } } diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 3cf626121b7..ce6a7353e07 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -61,7 +61,7 @@ impl fmt::Debug for UnboundedReceiver { /// the channel. Using an `unbounded` channel has the ability of causing the /// process to run out of memory. In this case, the process will be aborted. pub fn unbounded_channel() -> (UnboundedSender, UnboundedReceiver) { - let (tx, rx) = chan::channel(AtomicUsize::new(0)); + let (tx, rx) = chan::channel(Semaphore(AtomicUsize::new(0))); let tx = UnboundedSender::new(tx); let rx = UnboundedReceiver::new(rx); @@ -70,7 +70,8 @@ pub fn unbounded_channel() -> (UnboundedSender, UnboundedReceiver) { } /// No capacity -type Semaphore = AtomicUsize; +#[derive(Debug)] +pub(crate) struct Semaphore(pub(crate) AtomicUsize); impl UnboundedReceiver { pub(crate) fn new(chan: chan::Rx) -> UnboundedReceiver { @@ -279,7 +280,7 @@ impl UnboundedSender { use std::process; use std::sync::atomic::Ordering::{AcqRel, Acquire}; - let mut curr = self.chan.semaphore().load(Acquire); + let mut curr = self.chan.semaphore().0.load(Acquire); loop { if curr & 1 == 1 { @@ -295,6 +296,7 @@ impl UnboundedSender { match self .chan .semaphore() + .0 .compare_exchange(curr, curr + 2, AcqRel, Acquire) { Ok(_) => return true,