diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 47d7938158a..e5514277f6c 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -998,6 +998,8 @@ impl Sender { /// /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`]. + /// This is distinct from [`max_capacity`], which always returns buffer capacity initially + /// specified when calling [`channel`] /// /// # Examples /// @@ -1023,6 +1025,8 @@ impl Sender { /// /// [`send`]: Sender::send /// [`reserve`]: Sender::reserve + /// [`channel`]: channel + /// [`max_capacity`]: Sender::max_capacity pub fn capacity(&self) -> usize { self.chan.semaphore().0.available_permits() } @@ -1036,6 +1040,42 @@ impl Sender { chan: self.chan.downgrade(), } } + + /// Returns the maximum buffer capacity of the channel. + /// + /// The maximum capacity is the buffer capacity initially specified when calling + /// [`channel`]. This is distinct from [`capacity`], which returns the *current* + /// available buffer capacity: as messages are sent and received, the + /// value returned by [`capacity`] will go up or down, whereas the value + /// returned by `max_capacity` will remain constant. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, _rx) = mpsc::channel::<()>(5); + /// + /// // both max capacity and capacity are the same at first + /// assert_eq!(tx.max_capacity(), 5); + /// assert_eq!(tx.capacity(), 5); + /// + /// // Making a reservation doesn't change the max capacity. + /// let permit = tx.reserve().await.unwrap(); + /// assert_eq!(tx.max_capacity(), 5); + /// // but drops the capacity by one + /// assert_eq!(tx.capacity(), 4); + /// } + /// ``` + /// + /// [`channel`]: channel + /// [`max_capacity`]: Sender::max_capacity + /// [`capacity`]: Sender::capacity + pub fn max_capacity(&self) -> usize { + self.chan.semaphore().1 + } } impl Clone for Sender { diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index aebdf8548d8..24f078c62b1 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -921,3 +921,22 @@ async fn test_tx_count_weak_sender() { assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none()); } + +// Tests that channel `capacity` changes and `max_capacity` stays the same +#[tokio::test] +async fn test_tx_capacity() { + let (tx, _rx) = mpsc::channel::<()>(10); + // both capacities are same before + assert_eq!(tx.capacity(), 10); + assert_eq!(tx.max_capacity(), 10); + + let _permit = tx.reserve().await.unwrap(); + // after reserve, only capacity should drop by one + assert_eq!(tx.capacity(), 9); + assert_eq!(tx.max_capacity(), 10); + + let _sent = tx.send(()).await.unwrap(); + // after send, capacity should drop by one again + assert_eq!(tx.capacity(), 8); + assert_eq!(tx.max_capacity(), 10); +}