Skip to content

Commit

Permalink
sync::watch: Add an updated flag and method to Ref
Browse files Browse the repository at this point in the history
The flag is only set by Receiver::borrow_and_update(). Otherwise it is
initialized with `false`.
  • Loading branch information
uklotzde committed Jun 8, 2022
1 parent 6445c44 commit 63c9987
Showing 1 changed file with 24 additions and 40 deletions.
64 changes: 24 additions & 40 deletions tokio/src/sync/watch.rs
Expand Up @@ -112,6 +112,20 @@ pub struct Sender<T> {
#[derive(Debug)]
pub struct Ref<'a, T> {
inner: RwLockReadGuard<'a, T>,
updated: bool,
}

impl<'a, T> Ref<'a, T> {
/// Indicates if the shared value has been detected as _changed_
/// **and** marked as seen.
///
/// The result is `true` only if this reference has been obtained
/// from [`Receiver::borrow_and_update()`] in case changes have been
/// detected and marked as seen. In all other cases the result will
/// be `false`.
pub fn updated(&self) -> bool {
self.updated
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -332,14 +346,17 @@ impl<T> Receiver<T> {
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
Ref { inner }
// Changes are neither detected nor marked as seen by this function
let updated = false;
Ref { inner, updated }
}

/// Returns a reference to the most recently sent value and marks that value
/// as seen.
///
/// This method marks the current value as seen. Subsequent calls to [`changed`]
/// will not return immediately until the producer has modified the shared value again.
/// This method marks the current value as seen as indicated by [`Ref::updated`].
/// Subsequent calls to [`changed`] will not return immediately until the
/// [`Sender`] has modified the shared value again.
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
Expand All @@ -364,41 +381,6 @@ impl<T> Receiver<T> {
///
/// [`changed`]: Receiver::changed
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
self.borrow_and_update_if_changed().0
}

/// Returns a reference to the most recently sent value with an *updated*
/// flag indicating if that value has been detected as *changed* and marked
/// as seen.
///
/// This method marks the current value as seen. Subsequent calls to [`changed`]
/// will not return immediately until the producer has modified the shared value again.
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
/// as short lived as possible.
///
/// The priority policy of the lock is dependent on the underlying lock
/// implementation, and this type does not guarantee that any particular policy
/// will be used. In particular, a producer which is waiting to acquire the lock
/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
///
/// <details><summary>Potential deadlock example</summary>
///
/// ```text
/// // Task 1 (on thread A) | // Task 2 (on thread B)
/// let (_ref1, _updated1) = |
/// rx1.borrow_and_update_if_changed(); |
/// | // will block
/// | let _ = tx.send(());
/// // may deadlock |
/// let (_ref2, _updated2) = |
/// rx2.borrow_and_update_if_changed(); |
/// ```
/// </details>
///
/// [`changed`]: Receiver::changed
pub fn borrow_and_update_if_changed(&mut self) -> (Ref<'_, T>, bool) {
let inner = self.shared.value.read().unwrap();
// After obtaining a read-lock no concurrent writes could occur
// and the loaded version matches that of the borrowed reference.
Expand All @@ -409,7 +391,7 @@ impl<T> Receiver<T> {
} else {
false
};
(Ref { inner }, updated)
Ref { inner, updated }
}

/// Checks if this channel contains a message that this receiver has not yet
Expand Down Expand Up @@ -767,7 +749,9 @@ impl<T> Sender<T> {
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
Ref { inner }
// The sender/producer always sees the current value
let updated = false;
Ref { inner, updated }
}

/// Checks if the channel has been closed. This happens when all receivers
Expand Down

0 comments on commit 63c9987

Please sign in to comment.