Skip to content

Commit

Permalink
sync: improve docs for watch channels (#5954)
Browse files Browse the repository at this point in the history
## Motivation

I found the watch docs as written to be somewhat confusing.

* It wasn't clear to me whether values are marked seen or not at
  creation/subscribe time.
* The example also confused me a bit, suggesting a while loop when a
  do-while loop is generally more correct.
* I noticed a potential race with `borrow` that is no longer an issue
  with `borrow_and_update`.

## Solution

Update the documentation for the watch module to try and make all this
clearer.
  • Loading branch information
sunshowers committed Aug 28, 2023
1 parent fb3028f commit cb1e10b
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 15 deletions.
2 changes: 1 addition & 1 deletion tokio/src/sync/mod.rs
Expand Up @@ -383,7 +383,7 @@
//! sleep.set(time::sleep_until(op_start + conf.timeout));
//! }
//! _ = rx.changed() => {
//! conf = rx.borrow().clone();
//! conf = rx.borrow_and_update().clone();
//!
//! // The configuration has been updated. Update the
//! // `sleep` using the new `timeout` value.
Expand Down
103 changes: 89 additions & 14 deletions tokio/src/sync/watch.rs
Expand Up @@ -10,24 +10,75 @@
//!
//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
//! and consumer halves of the channel. The channel is created with an initial
//! value. The **latest** value stored in the channel is accessed with
//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
//! value to be sent by the [`Sender`] half.
//! value.
//!
//! Each [`Receiver`] independently tracks the last value *seen* by its caller.
//!
//! To access the **current** value stored in the channel and mark it as *seen*
//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
//!
//! To access the current value **without** marking it as *seen*, use
//! [`Receiver::borrow()`]. (If the value has already been marked *seen*,
//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
//!
//! For more information on when to use these methods, see
//! [here](#borrow_and_update-versus-borrow).
//!
//! ## Change notifications
//!
//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
//!
//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
//! `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped.
//! * If the current value is *unseen* when calling [`changed`], then
//! [`changed`] will return immediately. If the current value is *seen*, then
//! it will sleep until either a new message is sent via the [`Sender`] half,
//! or the [`Sender`] is dropped.
//! * On completion, the [`changed`] method marks the new value as *seen*.
//! * At creation, the initial value is considered *seen*. In other words,
//! [`Receiver::changed()`] will not return until a subsequent value is sent.
//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`].
//! The current value at the time the [`Receiver`] is created is considered
//! *seen*.
//!
//! ## `borrow_and_update` versus `borrow`
//!
//! If the receiver intends to await notifications from [`changed`] in a loop,
//! [`Receiver::borrow_and_update()`] should be preferred over
//! [`Receiver::borrow()`]. This avoids a potential race where a new value is
//! sent between [`changed`] being ready and the value being read. (If
//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.)
//!
//! If the receiver is only interested in the current value, and does not intend
//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
//! self`.
//!
//! # Examples
//!
//! The following example prints `hello! world! `.
//!
//! ```
//! use tokio::sync::watch;
//! use tokio::time::{Duration, sleep};
//!
//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
//! let (tx, mut rx) = watch::channel("hello");
//!
//! tokio::spawn(async move {
//! while rx.changed().await.is_ok() {
//! println!("received = {:?}", *rx.borrow());
//! // Use the equivalent of a "do-while" loop so the initial value is
//! // processed before awaiting the `changed()` future.
//! loop {
//! println!("{}! ", *rx.borrow_and_update());
//! if rx.changed().await.is_err() {
//! break;
//! }
//! }
//! });
//!
//! sleep(Duration::from_millis(100)).await;
//! tx.send("world")?;
//! # Ok(())
//! # }
Expand All @@ -39,8 +90,8 @@
//! when all [`Receiver`] handles have been dropped. This indicates that there
//! is no further interest in the values being produced and work can be stopped.
//!
//! The value in the channel will not be dropped until the sender and all receivers
//! have been dropped.
//! The value in the channel will not be dropped until the sender and all
//! receivers have been dropped.
//!
//! # Thread safety
//!
Expand All @@ -50,11 +101,15 @@
//!
//! [`Sender`]: crate::sync::watch::Sender
//! [`Receiver`]: crate::sync::watch::Receiver
//! [`changed`]: crate::sync::watch::Receiver::changed
//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
//! [`Receiver::borrow_and_update()`]:
//! crate::sync::watch::Receiver::borrow_and_update
//! [`channel`]: crate::sync::watch::channel
//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
//! [`Sender::closed`]: crate::sync::watch::Sender::closed
//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe

use crate::sync::notify::Notify;

Expand Down Expand Up @@ -374,19 +429,28 @@ mod state {
///
/// # Examples
///
/// The following example prints `hello! world! `.
///
/// ```
/// use tokio::sync::watch;
/// use tokio::time::{Duration, sleep};
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let (tx, mut rx) = watch::channel("hello");
/// let (tx, mut rx) = watch::channel("hello");
///
/// tokio::spawn(async move {
/// while rx.changed().await.is_ok() {
/// println!("received = {:?}", *rx.borrow());
/// tokio::spawn(async move {
/// // Use the equivalent of a "do-while" loop so the initial value is
/// // processed before awaiting the `changed()` future.
/// loop {
/// println!("{}! ", *rx.borrow_and_update());
/// if rx.changed().await.is_err() {
/// break;
/// }
/// });
/// }
/// });
///
/// tx.send("world")?;
/// sleep(Duration::from_millis(100)).await;
/// tx.send("world")?;
/// # Ok(())
/// # }
/// ```
Expand Down Expand Up @@ -453,7 +517,11 @@ impl<T> Receiver<T> {
/// ```
/// </details>
///
/// For more information on when to use this method versus
/// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow).
///
/// [`changed`]: Receiver::changed
/// [`borrow_and_update`]: Receiver::borrow_and_update
///
/// # Examples
///
Expand Down Expand Up @@ -505,7 +573,11 @@ impl<T> Receiver<T> {
/// ```
/// </details>
///
/// For more information on when to use this method versus [`borrow`], see
/// [here](self#borrow_and_update-versus-borrow).
///
/// [`changed`]: Receiver::changed
/// [`borrow`]: Receiver::borrow
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();

Expand Down Expand Up @@ -572,6 +644,9 @@ impl<T> Receiver<T> {
///
/// This method returns an error if and only if the [`Sender`] is dropped.
///
/// For more information, see
/// [*Change notifications*](self#change-notifications) in the module-level documentation.
///
/// # Cancel safety
///
/// This method is cancel safe. If you use it as the event in a
Expand All @@ -595,7 +670,7 @@ impl<T> Receiver<T> {
/// });
///
/// assert!(rx.changed().await.is_ok());
/// assert_eq!(*rx.borrow(), "goodbye");
/// assert_eq!(*rx.borrow_and_update(), "goodbye");
///
/// // The `tx` handle has been dropped
/// assert!(rx.changed().await.is_err());
Expand Down

0 comments on commit cb1e10b

Please sign in to comment.