Skip to content

sync: make notify_waiters calls atomic #5458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 19, 2023

Conversation

satakuma
Copy link
Member

Motivation

tokio::sync::Notify has an internal waiters queue locked behind a mutex. Notify::notify_waiters is a function to notify all waiters from this queue. It first acquires the lock, then removes waiters from the queue and wakes them up. However, the whole process is done in batches of 32 waiters to avoid deadlocks. The function has to release the lock before waking up a batch and re-acquire it before proceeding to the next one. In this short timespan, another thread can acquire the lock and modify the queue or the internal state. This leads to the following issues:

  1. Spurious wake-ups. This is an issue reported in one call tokio::sync::Notify::notify_waiters() can wake two sequential tokio::sync::Notify::notified().await #5396. A single notify_waiters call can notify multiple futures which are created and awaited sequentially. For example, both futures from the following code can complete:
notify.notified().await;
notify.notified().await;
  1. Inconsistency in results from poll. One could expect that sequentially polled futures will yield results consistent with each other. If one pending future returns Ready, then other futures which were pending should also be ready. As an example, let's say that we have two pending Notified futures: fut1 and fut2. If we call notify_waiters in parallel with the following, the assertion can fail:
let res1 = fut1.poll();
let res2 = fut2.poll();
assert!(res1.is_pending() || res2.is_ready()); // if res1 is ready, then res2 must also be ready
  1. Lost notifications from subsequent notify_one calls. It is possible that calling notify_one after awaiting a future notified by notify_waiters will result in the notification being lost. As an example, let's say that we have a pending notify.notified() future fut. If we call notify_waiters in parallel with the following code, it can hang forever:
fut.await;
notify.notify_one();
notify.notified().await; // this should consume the notification from `notify_one`.

For more context, see the discussion in #5404 or in the related Discord thread.

Closes: #5396

Solution

The idea is to move all waiters to a secondary list, which is created in each call to notify_waiters. This list will be unavailable to modify from notify_one and other parts of the code, so the waiters contained in it will not consume other notifications, and it will be impossible to insert new waiters into it. This approach eliminates all presented issues.

The tricky part is how to allow waiters contained by a secondary list to remove themselves from it. It is possible to drop a waiter still contained by a secondary list, and in such case it should be able to unlink itself from the neighboring nodes from the list. To solve it, this PR adds a new variant of a linked list, which allows removing a node from it without knowing the list head's memory address. This is done by adding a special “guard” node, which makes the list circular.

To solve the issue with inconsistency in results from poll, this PR also adds some additional logic to poll_notified. A polled future has to know whether it is already included by a secondary list in notify_waiters and is staged to be notified. In such case it also has to remove itself from the list. To determine if it is contained by a secondary list, a future uses the counter of notify_waiters calls. If the current value of this counter is different from the value recorded by a future at the time, when it was being added to the queue, then there is an ongoing notify_waiters call. In such case the future removes itself from the secondary list.

Verified

This commit was signed with the committer’s verified signature.
dtolnay David Tolnay
fix features CI and wasi CI

replace brittle test with a loom test

format feature list

fix a bug in counter increment 💀

fix a bug storing an old counter value

add loom test to check poll consistency

fix a typo

add loom test for polling between batches

add polling consistency between batches

remove magic constant

add test variant with different poll order

test atomicity when used with notify_one

take atomically entire list in `notify_waiters`

fix clippy errors with potential ub

decouple the list only if with multiple batches

add test for linked list iter

apply review suggestions
@github-actions github-actions bot added the R-loom Run loom tests on this PR label Feb 15, 2023
@carllerche carllerche added C-bug Category: This is a bug. A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Feb 15, 2023
/// `self` or not contained by any other list.
/// The caller **must** ensure that exactly one of the following is true:
/// - `node` is currently contained by `self`
/// - `node` is currently contained by some other list, but `node` is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is a very specific requirement, maybe include a note saying that this is for the Notify LL and why it relies on it (in a few words). Otherwise, if I were reading this without the necessary context, I would find it very strange.

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is looking good. I really like the guarded linked list approach here. I left some comments inline. I also am going to spend a bit more time thinking about whether moving the state atomic store matters.


let decoupled_list = std::mem::take(&mut *waiters);

let guard = UnsafeCell::new(Waiter::new());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to pin guard to ensure it doesn't accidentally move (the pin! macro might work here). Also, could you add a big comment saying it is critical for safety that guard does not move and is not dropped until the guarded list is dropped?

// transition **out** of `WAITING`.
// Increment the number of times this method was called
// and transition to empty.
let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this moved up? I'm not sure it matters, but it is not apparent it does not matter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is correct as long as the store happens while the mutex is held. Moving the store up increases the odds of a poll_notified succeeding early in some rare concurrent conditions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was moved up for correctness. If we allowed to poll a pending future between chunks and observe the old counter value, then it would be possible to observe the inconsistency from the description (number 2.). This is because such future would return Pending, even though other waiters from the decoupled list could be already notified. notify_waiters_poll_consistency_many checks such scenarios.

@satakuma
Copy link
Member Author

I applied suggestions from the review. I also removed unnecessary ManuallyDrop from the guarded linked list code because it could be a memory-leaking foot gun for link handles such as Arc.

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. I'll let @Darksonn take a look as well. Thanks!

@@ -540,6 +567,8 @@ impl Notify {
}
}

// Release the lock before notifying.
// `guarded_list` is no longer used.
drop(waiters);

wakers.wake_all();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must clean up the linked list even if this call panics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good catch. I moved the list to a new struct, which makes sure the list is cleaned up on drop.

Comment on lines +297 to +308
fn drop(&mut self) {
// If the list is not empty, we unlink all waiters from it.
// We do not wake the waiters to avoid double panics.
if !self.is_empty {
let _lock_guard = self.notify.waiters.lock();
while let Some(mut waiter) = self.list.pop_back() {
// Safety: we hold the lock.
let waiter = unsafe { waiter.as_mut() };
waiter.notified = Some(NotificationType::AllWaiters);
}
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this destructor does not call waker.wake(), which means the remaining waiters will never be woken up unless manually polled. However, this is already the case in the current code because one panicking waker from a batch can result in the whole batch never being notified, see the discussion linked in #4069.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Looks good to me.

@Darksonn Darksonn merged commit 795754a into tokio-rs:master Feb 19, 2023
amab8901 pushed a commit to amab8901/tokio that referenced this pull request Feb 27, 2023
Noah-Kennedy added a commit that referenced this pull request Mar 1, 2023
# 1.26.0 (March 1st, 2023)

### Fixed

- sync: don't leak tracing spans in mutex guards ([#5469])
- sync: drop wakers after unlocking the mutex in Notify ([#5471])
- sync: drop wakers outside lock in semaphore ([#5475])
- macros: fix empty `join!` and `try_join!` ([#5504])

### Added

- fs: add `fs::try_exists` ([#4299])
- net: add types for named unix pipes ([#5351])
- sync: add `MappedOwnedMutexGuard` ([#5474])

### Documented

- task: clarify what happens to spawned work during runtime shutdown ([#5394])
- task: clarify `process::Command` docs (#5406) ([#5413])
- sync: add doc aliases for `blocking_*` methods ([#5448])
- task: fix wording with 'unsend' ([#5452])
- signal: updated Documentation for Signals ([#5459])
- sync: fix docs for Send/Sync bounds in broadcast ([#5480])
- io: improve AsyncFd example ([#5481])
- tokio: document supported platforms ([#5483])
- runtime: document the nature of the main future ([#5494])
- sync: document drop behavior for channels ([#5497])
- time: document immediate completion guarantee for timeouts ([#5509])
- runtime: remove extra period in docs ([#5511])

### Changed

- net: use Message Read Mode for named pipes ([#5350])
- chore: update windows-sys to 0.45 ([#5386])
- sync: mark lock guards with `#[clippy::has_significant_drop]` ([#5422])
- sync: reduce contention in watch channel ([#5464])
- time: remove cache padding in timer entries ([#5468])
- time: Improve `Instant::now()` perf with test-util ([#5513])

### Internal Changes
- tests: port proptest fuzz harnesses to use cargo-fuzz ([#5392])
- time: don't store deadline twice in sleep entries ([#5410])
- rt: remove Arc from Clock ([#5434])
- sync: make `notify_waiters` calls atomic ([#5458])
- net: refactor named pipe builders to not use bitfields ([#5477])
- io: use `poll_fn` in `copy_bidirectional` ([#5486])
- fs: add more tests for filesystem functionality ([#5493])
- net: fix test compilation failure ([#5506])
- io: ignore SplitByUtf8BoundaryIfWindows test on miri ([#5507])

### Unstable

- metrics: add a new metric for budget exhaustion yields ([#5517])

[#4299]: #4299
[#5350]: #5350
[#5351]: #5351
[#5386]: #5386
[#5392]: #5392
[#5394]: #5394
[#5410]: #5410
[#5413]: #5413
[#5422]: #5422
[#5434]: #5434
[#5448]: #5448
[#5452]: #5452
[#5458]: #5458
[#5459]: #5459
[#5464]: #5464
[#5468]: #5468
[#5469]: #5469
[#5471]: #5471
[#5474]: #5474
[#5475]: #5475
[#5477]: #5477
[#5480]: #5480
[#5481]: #5481
[#5483]: #5483
[#5486]: #5486
[#5493]: #5493
[#5494]: #5494
[#5497]: #5497
[#5504]: #5504
[#5506]: #5506
[#5507]: #5507
[#5509]: #5509
[#5511]: #5511
[#5513]: #5513
[#5517]: #5517
Noah-Kennedy added a commit that referenced this pull request Mar 1, 2023
# 1.26.0 (March 1st, 2023)

### Fixed

- macros: fix empty `join!` and `try_join!` ([#5504])
- sync: don't leak tracing spans in mutex guards ([#5469])
- sync: drop wakers after unlocking the mutex in Notify ([#5471])
- sync: drop wakers outside lock in semaphore ([#5475])

### Added

- fs: add `fs::try_exists` ([#4299])
- net: add types for named unix pipes ([#5351])
- sync: add `MappedOwnedMutexGuard` ([#5474])

### Changed

- chore: update windows-sys to 0.45 ([#5386])
- net: use Message Read Mode for named pipes ([#5350])
- sync: mark lock guards with `#[clippy::has_significant_drop]` ([#5422])
- sync: reduce contention in watch channel ([#5464])
- time: remove cache padding in timer entries ([#5468])
- time: Improve `Instant::now()` perf with test-util ([#5513])

### Internal Changes

- io: use `poll_fn` in `copy_bidirectional` ([#5486])
- net: refactor named pipe builders to not use bitfields ([#5477])
- rt: remove Arc from Clock ([#5434])
- sync: make `notify_waiters` calls atomic ([#5458])
- time: don't store deadline twice in sleep entries ([#5410])

### Unstable

- metrics: add a new metric for budget exhaustion yields ([#5517])

### Documented

- io: improve AsyncFd example ([#5481])
- runtime: document the nature of the main future ([#5494])
- runtime: remove extra period in docs ([#5511])
- signal: updated Documentation for Signals ([#5459])
- sync: add doc aliases for `blocking_*` methods ([#5448])
- sync: fix docs for Send/Sync bounds in broadcast ([#5480])
- sync: document drop behavior for channels ([#5497])
- task: clarify what happens to spawned work during runtime shutdown ([#5394])
- task: clarify `process::Command` docs ([#5413])
- task: fix wording with 'unsend' ([#5452])
- time: document immediate completion guarantee for timeouts ([#5509])
- tokio: document supported platforms ([#5483])

[#4299]: #4299
[#5350]: #5350
[#5351]: #5351
[#5386]: #5386
[#5394]: #5394
[#5410]: #5410
[#5413]: #5413
[#5422]: #5422
[#5434]: #5434
[#5448]: #5448
[#5452]: #5452
[#5458]: #5458
[#5459]: #5459
[#5464]: #5464
[#5468]: #5468
[#5469]: #5469
[#5471]: #5471
[#5474]: #5474
[#5475]: #5475
[#5477]: #5477
[#5480]: #5480
[#5481]: #5481
[#5483]: #5483
[#5486]: #5486
[#5494]: #5494
[#5497]: #5497
[#5504]: #5504
[#5509]: #5509
[#5511]: #5511
[#5513]: #5513
[#5517]: #5517
Noah-Kennedy added a commit that referenced this pull request Mar 1, 2023
# 1.26.0 (March 1st, 2023)

### Fixed

- macros: fix empty `join!` and `try_join!` ([#5504])
- sync: don't leak tracing spans in mutex guards ([#5469])
- sync: drop wakers after unlocking the mutex in Notify ([#5471])
- sync: drop wakers outside lock in semaphore ([#5475])

### Added

- fs: add `fs::try_exists` ([#4299])
- net: add types for named unix pipes ([#5351])
- sync: add `MappedOwnedMutexGuard` ([#5474])

### Changed

- chore: update windows-sys to 0.45 ([#5386])
- net: use Message Read Mode for named pipes ([#5350])
- sync: mark lock guards with `#[clippy::has_significant_drop]` ([#5422])
- sync: reduce contention in watch channel ([#5464])
- time: remove cache padding in timer entries ([#5468])
- time: Improve `Instant::now()` perf with test-util ([#5513])

### Internal Changes

- io: use `poll_fn` in `copy_bidirectional` ([#5486])
- net: refactor named pipe builders to not use bitfields ([#5477])
- rt: remove Arc from Clock ([#5434])
- sync: make `notify_waiters` calls atomic ([#5458])
- time: don't store deadline twice in sleep entries ([#5410])

### Unstable

- metrics: add a new metric for budget exhaustion yields ([#5517])

### Documented

- io: improve AsyncFd example ([#5481])
- runtime: document the nature of the main future ([#5494])
- runtime: remove extra period in docs ([#5511])
- signal: updated Documentation for Signals ([#5459])
- sync: add doc aliases for `blocking_*` methods ([#5448])
- sync: fix docs for Send/Sync bounds in broadcast ([#5480])
- sync: document drop behavior for channels ([#5497])
- task: clarify what happens to spawned work during runtime shutdown ([#5394])
- task: clarify `process::Command` docs ([#5413])
- task: fix wording with 'unsend' ([#5452])
- time: document immediate completion guarantee for timeouts ([#5509])
- tokio: document supported platforms ([#5483])

[#4299]: #4299
[#5350]: #5350
[#5351]: #5351
[#5386]: #5386
[#5394]: #5394
[#5410]: #5410
[#5413]: #5413
[#5422]: #5422
[#5434]: #5434
[#5448]: #5448
[#5452]: #5452
[#5458]: #5458
[#5459]: #5459
[#5464]: #5464
[#5468]: #5468
[#5469]: #5469
[#5471]: #5471
[#5474]: #5474
[#5475]: #5475
[#5477]: #5477
[#5480]: #5480
[#5481]: #5481
[#5483]: #5483
[#5486]: #5486
[#5494]: #5494
[#5497]: #5497
[#5504]: #5504
[#5509]: #5509
[#5511]: #5511
[#5513]: #5513
[#5517]: #5517
This was referenced Apr 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync R-loom Run loom tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

one call tokio::sync::Notify::notify_waiters() can wake two sequential tokio::sync::Notify::notified().await
3 participants