Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio: add mpsc::Receiver::max_capacity and mpsc::Receiver::capacity #6511

Merged
merged 3 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
88 changes: 84 additions & 4 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ impl<T> Receiver<T> {
/// assert!(!rx.is_closed());
///
/// rx.close();
///
///
/// assert!(rx.is_closed());
/// }
/// ```
Expand Down Expand Up @@ -530,6 +530,86 @@ impl<T> Receiver<T> {
self.chan.len()
}

/// Returns the current capacity of the channel.
///
/// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
/// capacity with [`Sender::reserve`]. The capacity goes up when values are received.
/// This is distinct from [`max_capacity`], which always returns buffer capacity initially
/// specified when calling [`channel`].
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel::<()>(5);
///
/// assert_eq!(rx.capacity(), 5);
///
/// // Making a reservation drops the capacity by one.
/// let permit = tx.reserve().await.unwrap();
/// assert_eq!(rx.capacity(), 4);
/// assert_eq!(rx.len(), 0);
///
/// // Sending and receiving a value increases the capacity by one.
/// permit.send(());
/// assert_eq!(rx.len(), 1);
/// rx.recv().await.unwrap();
/// assert_eq!(rx.capacity(), 5);
///
/// // Directly sending a message drops the capacity by one.
/// tx.send(()).await.unwrap();
/// assert_eq!(rx.capacity(), 4);
/// assert_eq!(rx.len(), 1);
///
/// // Receiving the message increases the capacity by one.
/// rx.recv().await.unwrap();
/// assert_eq!(rx.capacity(), 5);
/// assert_eq!(rx.len(), 0);
/// }
/// ```
/// [`capacity`]: Receiver::capacity
/// [`max_capacity`]: Receiver::max_capacity
pub fn capacity(&self) -> usize {
self.chan.semaphore().semaphore.available_permits()
}
Comment on lines +575 to +577
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is not actually equal to max_capacity - len. Generally, sending a message goes through this procedure:

  1. Acquire a permit for the message.
  2. Write the message into the channel's storage.
  3. Make the message available to receivers.

With the len method, the length only changes in step 3, but with this method, the capacity already changes in step 1. If the user is using Permit to acquire permits before sending messages, then there may be a long amount of time between step 1 and step 2.

The documentation should clearly explain the difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the example to explain the relation with Receiver::len. Is that sufficient?


/// Returns the maximum buffer capacity of the channel.
///
/// The maximum capacity is the buffer capacity initially specified when calling
/// [`channel`]. This is distinct from [`capacity`], which returns the *current*
/// available buffer capacity: as messages are sent and received, the value
/// returned by [`capacity`] will go up or down, whereas the value
/// returned by [`max_capacity`] will remain constant.
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::channel::<()>(5);
///
/// // both max capacity and capacity are the same at first
/// assert_eq!(rx.max_capacity(), 5);
/// assert_eq!(rx.capacity(), 5);
///
/// // Making a reservation doesn't change the max capacity.
/// let permit = tx.reserve().await.unwrap();
/// assert_eq!(rx.max_capacity(), 5);
/// // but drops the capacity by one
/// assert_eq!(rx.capacity(), 4);
/// }
/// ```
/// [`capacity`]: Receiver::capacity
/// [`max_capacity`]: Receiver::max_capacity
pub fn max_capacity(&self) -> usize {
self.chan.semaphore().bound
}

/// Polls to receive the next message on this channel.
///
/// This method returns:
Expand Down Expand Up @@ -1059,7 +1139,7 @@ impl<T> Sender<T> {
///
/// // The iterator should now be exhausted
/// assert!(permit.next().is_none());
///
///
/// // The value sent on the permit is received
/// assert_eq!(rx.recv().await.unwrap(), 456);
/// assert_eq!(rx.recv().await.unwrap(), 457);
Expand Down Expand Up @@ -1274,7 +1354,7 @@ impl<T> Sender<T> {
/// // The value sent on the permit is received
/// assert_eq!(rx.recv().await.unwrap(), 456);
/// assert_eq!(rx.recv().await.unwrap(), 457);
///
///
/// // Trying to call try_reserve_many with 0 will return an empty iterator
/// let mut permit = tx.try_reserve_many(0).unwrap();
/// assert!(permit.next().is_none());
Expand Down Expand Up @@ -1447,7 +1527,7 @@ impl<T> Sender<T> {
/// [`channel`]. This is distinct from [`capacity`], which returns the *current*
/// available buffer capacity: as messages are sent and received, the
/// value returned by [`capacity`] will go up or down, whereas the value
/// returned by `max_capacity` will remain constant.
/// returned by [`max_capacity`] will remain constant.
///
/// # Examples
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,10 @@ impl<T, S: Semaphore> Rx<T, S> {
}
})
}

pub(super) fn semaphore(&self) -> &S {
&self.inner.semaphore
}
}

impl<T, S: Semaphore> Drop for Rx<T, S> {
Expand Down