Skip to content

Commit

Permalink
sync: add watch::Receiver::mark_unseen (#5962)
Browse files Browse the repository at this point in the history
  • Loading branch information
victor-timofei committed Sep 11, 2023
1 parent 1c428cc commit 61042b4
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
19 changes: 17 additions & 2 deletions tokio/src/sync/watch.rs
Expand Up @@ -380,7 +380,17 @@ mod state {
impl Version {
/// Get the initial version when creating the channel.
pub(super) fn initial() -> Self {
Version(0)
// The initial version is 1 so that `mark_unseen` can decrement by one.
// (The value is 2 due to the closed bit.)
Version(2)
}

/// Decrements the version.
pub(super) fn decrement(&mut self) {
// Decrement by two to avoid touching the CLOSED bit.

This comment has been minimized.

Copy link
@uklotzde

uklotzde Sep 17, 2023

Contributor

decrement() is not needed for anything else than marking the current version as unchanged/unseen.

The special value 0 seems to be a better choice for this purpose, no conditional needed. This special value is already used implicitly when decrementing the initial version.

if self.0 >= 2 {
self.0 -= 2;
}
}
}

Expand All @@ -400,7 +410,7 @@ mod state {
/// Create a new `AtomicState` that is not closed and which has the
/// version set to `Version::initial()`.
pub(super) fn new() -> Self {
AtomicState(AtomicUsize::new(0))
AtomicState(AtomicUsize::new(2))
}

/// Load the current value of the state.
Expand Down Expand Up @@ -634,6 +644,11 @@ impl<T> Receiver<T> {
Ok(self.version != new_version)
}

/// Marks the state as unseen.
pub fn mark_unseen(&mut self) {

This comment has been minimized.

Copy link
@uklotzde

uklotzde Sep 17, 2023

Contributor

Why introduce a new wording "unseen" if we already have changed()and has_changed(), i.e. why not mark_unchanged()?

self.version.decrement();
}

/// Waits for a change notification, then marks the newest value as seen.
///
/// If the newest value in the channel has not yet been marked seen when
Expand Down
58 changes: 58 additions & 0 deletions tokio/tests/sync_watch.rs
Expand Up @@ -44,6 +44,64 @@ fn single_rx_recv() {
assert_eq!(*rx.borrow(), "two");
}

#[test]
fn rx_version_underflow() {
let (_tx, mut rx) = watch::channel("one");

// Version starts at 2, validate we do not underflow
rx.mark_unseen();
rx.mark_unseen();
}

#[test]
fn rx_mark_unseen() {
let (tx, mut rx) = watch::channel("one");

let mut rx2 = rx.clone();
let mut rx3 = rx.clone();
let mut rx4 = rx.clone();
{
rx.mark_unseen();
assert!(rx.has_changed().unwrap());

let mut t = spawn(rx.changed());
assert_ready_ok!(t.poll());
}

{
assert!(!rx2.has_changed().unwrap());

let mut t = spawn(rx2.changed());
assert_pending!(t.poll());
}

{
rx3.mark_unseen();
assert_eq!(*rx3.borrow(), "one");

assert!(rx3.has_changed().unwrap());

assert_eq!(*rx3.borrow_and_update(), "one");

assert!(!rx3.has_changed().unwrap());

let mut t = spawn(rx3.changed());
assert_pending!(t.poll());
}

{
tx.send("two").unwrap();
assert!(rx4.has_changed().unwrap());
assert_eq!(*rx4.borrow_and_update(), "two");

rx4.mark_unseen();
assert!(rx4.has_changed().unwrap());
assert_eq!(*rx4.borrow_and_update(), "two")
}

assert_eq!(*rx.borrow(), "two");
}

#[test]
fn multi_rx() {
let (tx, mut rx1) = watch::channel("one");
Expand Down

0 comments on commit 61042b4

Please sign in to comment.