From d081f83f762c43cb826c70945a73ce3bb89f619f Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 9 Jun 2022 09:46:45 +0200 Subject: [PATCH] sync::watch::Ref: Replace updated() with has_changed() --- tokio/src/sync/watch.rs | 94 +++++++++++++++++++++++++++++------------ 1 file changed, 68 insertions(+), 26 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index ef61dc339dd..04f1a0de342 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -112,19 +112,55 @@ pub struct Sender { #[derive(Debug)] pub struct Ref<'a, T> { inner: RwLockReadGuard<'a, T>, - updated: bool, + has_changed: bool, } impl<'a, T> Ref<'a, T> { - /// Indicates if the shared value has been detected as _changed_ - /// **and** marked as seen. - /// - /// The result is `true` only if this reference has been obtained - /// from [`Receiver::borrow_and_update()`] in case changes have been - /// detected and marked as seen. In all other cases the result will - /// be `false`. - pub fn updated(&self) -> bool { - self.updated + /// Indicates if the borrowed value is considered as _changed_ since the last + /// time it has been marked as seen. + /// + /// Other than [`Receiver::has_changed()`] it does not fail if the [`Receiver`] + /// is orphaned after the [`Sender`] has been dropped. + /// + /// When borrowed from the [`Sender`] this function will always return `false`. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = watch::channel("hello"); + /// + /// tx.send("goodbye").unwrap(); + /// // The sender does never consider the value as changed. + /// assert!(!tx.borrow().has_changed()); + /// + /// // Drop the sender immediately, just for testing purposes. + /// drop(tx); + /// + /// // Even if the sender has already been dropped... + /// assert!(rx.has_changed().is_err()); + /// // ...the modified value is still readable and detected as changed. + /// assert_eq!(*rx.borrow(), "goodbye"); + /// assert!(rx.borrow().has_changed()); + /// + /// // Read the changed value and mark it as seen. + /// { + /// let received = rx.borrow_and_update(); + /// assert_eq!(*received, "goodbye"); + /// assert!(received.has_changed()); + /// // Release the read lock when leaving this scope. + /// } + /// + /// // Now the value has already been marked as seen and couldd + /// // never be modified again (after the sender has been dropped). + /// assert!(!rx.borrow().has_changed()); + /// } + /// ``` + pub fn has_changed(&self) -> bool { + self.has_changed } } @@ -346,17 +382,21 @@ impl Receiver { /// ``` pub fn borrow(&self) -> Ref<'_, T> { let inner = self.shared.value.read().unwrap(); - // Changes are neither detected nor marked as seen by this function - let updated = false; - Ref { inner, updated } + + // After obtaining a read-lock no concurrent writes could occur + // and the loaded version matches that of the borrowed reference. + let new_version = self.shared.state.load().version(); + let has_changed = self.version != new_version; + + Ref { inner, has_changed } } /// Returns a reference to the most recently sent value and marks that value /// as seen. /// - /// This method marks the current value as seen as indicated by [`Ref::updated`]. - /// Subsequent calls to [`changed`] will not return immediately until the - /// [`Sender`] has modified the shared value again. + /// This method marks the current value as seen. Subsequent calls to [`changed`] + /// will not return immediately until the [`Sender`] has modified the shared + /// value again. /// /// Outstanding borrows hold a read lock. This means that long lived borrows /// could cause the send half to block. It is recommended to keep the borrow @@ -382,16 +422,16 @@ impl Receiver { /// [`changed`]: Receiver::changed pub fn borrow_and_update(&mut self) -> Ref<'_, T> { let inner = self.shared.value.read().unwrap(); + // After obtaining a read-lock no concurrent writes could occur // and the loaded version matches that of the borrowed reference. let new_version = self.shared.state.load().version(); - let updated = if self.version != new_version { - self.version = new_version; - true - } else { - false - }; - Ref { inner, updated } + let has_changed = self.version != new_version; + + // Mark the shared value as seen by updating the version + self.version = new_version; + + Ref { inner, has_changed } } /// Checks if this channel contains a message that this receiver has not yet @@ -749,9 +789,11 @@ impl Sender { /// ``` pub fn borrow(&self) -> Ref<'_, T> { let inner = self.shared.value.read().unwrap(); - // The sender/producer always sees the current value - let updated = false; - Ref { inner, updated } + + // The sender/producer always sees the current version + let has_changed = false; + + Ref { inner, has_changed } } /// Checks if the channel has been closed. This happens when all receivers