- Sponsor
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
sync: make notify_waiters
calls atomic
#5458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
fix features CI and wasi CI replace brittle test with a loom test format feature list fix a bug in counter increment 💀 fix a bug storing an old counter value add loom test to check poll consistency fix a typo add loom test for polling between batches add polling consistency between batches remove magic constant add test variant with different poll order test atomicity when used with notify_one take atomically entire list in `notify_waiters` fix clippy errors with potential ub decouple the list only if with multiple batches add test for linked list iter apply review suggestions
tokio/src/util/linked_list.rs
Outdated
/// `self` or not contained by any other list. | ||
/// The caller **must** ensure that exactly one of the following is true: | ||
/// - `node` is currently contained by `self` | ||
/// - `node` is currently contained by some other list, but `node` is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this is a very specific requirement, maybe include a note saying that this is for the Notify
LL and why it relies on it (in a few words). Otherwise, if I were reading this without the necessary context, I would find it very strange.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is looking good. I really like the guarded linked list approach here. I left some comments inline. I also am going to spend a bit more time thinking about whether moving the state atomic store matters.
|
||
let decoupled_list = std::mem::take(&mut *waiters); | ||
|
||
let guard = UnsafeCell::new(Waiter::new()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to pin guard
to ensure it doesn't accidentally move (the pin!
macro might work here). Also, could you add a big comment saying it is critical for safety that guard
does not move and is not dropped until the guarded list is dropped?
// transition **out** of `WAITING`. | ||
// Increment the number of times this method was called | ||
// and transition to empty. | ||
let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this moved up? I'm not sure it matters, but it is not apparent it does not matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is correct as long as the store happens while the mutex is held. Moving the store up increases the odds of a poll_notified
succeeding early in some rare concurrent conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was moved up for correctness. If we allowed to poll a pending future between chunks and observe the old counter value, then it would be possible to observe the inconsistency from the description (number 2.). This is because such future would return Pending
, even though other waiters from the decoupled list could be already notified. notify_waiters_poll_consistency_many
checks such scenarios.
I applied suggestions from the review. I also removed unnecessary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. I'll let @Darksonn take a look as well. Thanks!
@@ -540,6 +567,8 @@ impl Notify { | |||
} | |||
} | |||
|
|||
// Release the lock before notifying. | |||
// `guarded_list` is no longer used. | |||
drop(waiters); | |||
|
|||
wakers.wake_all(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We must clean up the linked list even if this call panics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good catch. I moved the list to a new struct, which makes sure the list is cleaned up on drop.
fn drop(&mut self) { | ||
// If the list is not empty, we unlink all waiters from it. | ||
// We do not wake the waiters to avoid double panics. | ||
if !self.is_empty { | ||
let _lock_guard = self.notify.waiters.lock(); | ||
while let Some(mut waiter) = self.list.pop_back() { | ||
// Safety: we hold the lock. | ||
let waiter = unsafe { waiter.as_mut() }; | ||
waiter.notified = Some(NotificationType::AllWaiters); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this destructor does not call waker.wake()
, which means the remaining waiters will never be woken up unless manually polled. However, this is already the case in the current code because one panicking waker from a batch can result in the whole batch never being notified, see the discussion linked in #4069.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Looks good to me.
# 1.26.0 (March 1st, 2023) ### Fixed - sync: don't leak tracing spans in mutex guards ([#5469]) - sync: drop wakers after unlocking the mutex in Notify ([#5471]) - sync: drop wakers outside lock in semaphore ([#5475]) - macros: fix empty `join!` and `try_join!` ([#5504]) ### Added - fs: add `fs::try_exists` ([#4299]) - net: add types for named unix pipes ([#5351]) - sync: add `MappedOwnedMutexGuard` ([#5474]) ### Documented - task: clarify what happens to spawned work during runtime shutdown ([#5394]) - task: clarify `process::Command` docs (#5406) ([#5413]) - sync: add doc aliases for `blocking_*` methods ([#5448]) - task: fix wording with 'unsend' ([#5452]) - signal: updated Documentation for Signals ([#5459]) - sync: fix docs for Send/Sync bounds in broadcast ([#5480]) - io: improve AsyncFd example ([#5481]) - tokio: document supported platforms ([#5483]) - runtime: document the nature of the main future ([#5494]) - sync: document drop behavior for channels ([#5497]) - time: document immediate completion guarantee for timeouts ([#5509]) - runtime: remove extra period in docs ([#5511]) ### Changed - net: use Message Read Mode for named pipes ([#5350]) - chore: update windows-sys to 0.45 ([#5386]) - sync: mark lock guards with `#[clippy::has_significant_drop]` ([#5422]) - sync: reduce contention in watch channel ([#5464]) - time: remove cache padding in timer entries ([#5468]) - time: Improve `Instant::now()` perf with test-util ([#5513]) ### Internal Changes - tests: port proptest fuzz harnesses to use cargo-fuzz ([#5392]) - time: don't store deadline twice in sleep entries ([#5410]) - rt: remove Arc from Clock ([#5434]) - sync: make `notify_waiters` calls atomic ([#5458]) - net: refactor named pipe builders to not use bitfields ([#5477]) - io: use `poll_fn` in `copy_bidirectional` ([#5486]) - fs: add more tests for filesystem functionality ([#5493]) - net: fix test compilation failure ([#5506]) - io: ignore SplitByUtf8BoundaryIfWindows test on miri ([#5507]) ### Unstable - metrics: add a new metric for budget exhaustion yields ([#5517]) [#4299]: #4299 [#5350]: #5350 [#5351]: #5351 [#5386]: #5386 [#5392]: #5392 [#5394]: #5394 [#5410]: #5410 [#5413]: #5413 [#5422]: #5422 [#5434]: #5434 [#5448]: #5448 [#5452]: #5452 [#5458]: #5458 [#5459]: #5459 [#5464]: #5464 [#5468]: #5468 [#5469]: #5469 [#5471]: #5471 [#5474]: #5474 [#5475]: #5475 [#5477]: #5477 [#5480]: #5480 [#5481]: #5481 [#5483]: #5483 [#5486]: #5486 [#5493]: #5493 [#5494]: #5494 [#5497]: #5497 [#5504]: #5504 [#5506]: #5506 [#5507]: #5507 [#5509]: #5509 [#5511]: #5511 [#5513]: #5513 [#5517]: #5517
# 1.26.0 (March 1st, 2023) ### Fixed - macros: fix empty `join!` and `try_join!` ([#5504]) - sync: don't leak tracing spans in mutex guards ([#5469]) - sync: drop wakers after unlocking the mutex in Notify ([#5471]) - sync: drop wakers outside lock in semaphore ([#5475]) ### Added - fs: add `fs::try_exists` ([#4299]) - net: add types for named unix pipes ([#5351]) - sync: add `MappedOwnedMutexGuard` ([#5474]) ### Changed - chore: update windows-sys to 0.45 ([#5386]) - net: use Message Read Mode for named pipes ([#5350]) - sync: mark lock guards with `#[clippy::has_significant_drop]` ([#5422]) - sync: reduce contention in watch channel ([#5464]) - time: remove cache padding in timer entries ([#5468]) - time: Improve `Instant::now()` perf with test-util ([#5513]) ### Internal Changes - io: use `poll_fn` in `copy_bidirectional` ([#5486]) - net: refactor named pipe builders to not use bitfields ([#5477]) - rt: remove Arc from Clock ([#5434]) - sync: make `notify_waiters` calls atomic ([#5458]) - time: don't store deadline twice in sleep entries ([#5410]) ### Unstable - metrics: add a new metric for budget exhaustion yields ([#5517]) ### Documented - io: improve AsyncFd example ([#5481]) - runtime: document the nature of the main future ([#5494]) - runtime: remove extra period in docs ([#5511]) - signal: updated Documentation for Signals ([#5459]) - sync: add doc aliases for `blocking_*` methods ([#5448]) - sync: fix docs for Send/Sync bounds in broadcast ([#5480]) - sync: document drop behavior for channels ([#5497]) - task: clarify what happens to spawned work during runtime shutdown ([#5394]) - task: clarify `process::Command` docs ([#5413]) - task: fix wording with 'unsend' ([#5452]) - time: document immediate completion guarantee for timeouts ([#5509]) - tokio: document supported platforms ([#5483]) [#4299]: #4299 [#5350]: #5350 [#5351]: #5351 [#5386]: #5386 [#5394]: #5394 [#5410]: #5410 [#5413]: #5413 [#5422]: #5422 [#5434]: #5434 [#5448]: #5448 [#5452]: #5452 [#5458]: #5458 [#5459]: #5459 [#5464]: #5464 [#5468]: #5468 [#5469]: #5469 [#5471]: #5471 [#5474]: #5474 [#5475]: #5475 [#5477]: #5477 [#5480]: #5480 [#5481]: #5481 [#5483]: #5483 [#5486]: #5486 [#5494]: #5494 [#5497]: #5497 [#5504]: #5504 [#5509]: #5509 [#5511]: #5511 [#5513]: #5513 [#5517]: #5517
# 1.26.0 (March 1st, 2023) ### Fixed - macros: fix empty `join!` and `try_join!` ([#5504]) - sync: don't leak tracing spans in mutex guards ([#5469]) - sync: drop wakers after unlocking the mutex in Notify ([#5471]) - sync: drop wakers outside lock in semaphore ([#5475]) ### Added - fs: add `fs::try_exists` ([#4299]) - net: add types for named unix pipes ([#5351]) - sync: add `MappedOwnedMutexGuard` ([#5474]) ### Changed - chore: update windows-sys to 0.45 ([#5386]) - net: use Message Read Mode for named pipes ([#5350]) - sync: mark lock guards with `#[clippy::has_significant_drop]` ([#5422]) - sync: reduce contention in watch channel ([#5464]) - time: remove cache padding in timer entries ([#5468]) - time: Improve `Instant::now()` perf with test-util ([#5513]) ### Internal Changes - io: use `poll_fn` in `copy_bidirectional` ([#5486]) - net: refactor named pipe builders to not use bitfields ([#5477]) - rt: remove Arc from Clock ([#5434]) - sync: make `notify_waiters` calls atomic ([#5458]) - time: don't store deadline twice in sleep entries ([#5410]) ### Unstable - metrics: add a new metric for budget exhaustion yields ([#5517]) ### Documented - io: improve AsyncFd example ([#5481]) - runtime: document the nature of the main future ([#5494]) - runtime: remove extra period in docs ([#5511]) - signal: updated Documentation for Signals ([#5459]) - sync: add doc aliases for `blocking_*` methods ([#5448]) - sync: fix docs for Send/Sync bounds in broadcast ([#5480]) - sync: document drop behavior for channels ([#5497]) - task: clarify what happens to spawned work during runtime shutdown ([#5394]) - task: clarify `process::Command` docs ([#5413]) - task: fix wording with 'unsend' ([#5452]) - time: document immediate completion guarantee for timeouts ([#5509]) - tokio: document supported platforms ([#5483]) [#4299]: #4299 [#5350]: #5350 [#5351]: #5351 [#5386]: #5386 [#5394]: #5394 [#5410]: #5410 [#5413]: #5413 [#5422]: #5422 [#5434]: #5434 [#5448]: #5448 [#5452]: #5452 [#5458]: #5458 [#5459]: #5459 [#5464]: #5464 [#5468]: #5468 [#5469]: #5469 [#5471]: #5471 [#5474]: #5474 [#5475]: #5475 [#5477]: #5477 [#5480]: #5480 [#5481]: #5481 [#5483]: #5483 [#5486]: #5486 [#5494]: #5494 [#5497]: #5497 [#5504]: #5504 [#5509]: #5509 [#5511]: #5511 [#5513]: #5513 [#5517]: #5517
Motivation
tokio::sync::Notify
has an internal waiters queue locked behind a mutex.Notify::notify_waiters
is a function to notify all waiters from this queue. It first acquires the lock, then removes waiters from the queue and wakes them up. However, the whole process is done in batches of 32 waiters to avoid deadlocks. The function has to release the lock before waking up a batch and re-acquire it before proceeding to the next one. In this short timespan, another thread can acquire the lock and modify the queue or the internal state. This leads to the following issues:notify_waiters
call can notify multiple futures which are created and awaited sequentially. For example, both futures from the following code can complete:poll
. One could expect that sequentially polled futures will yield results consistent with each other. If one pending future returnsReady
, then other futures which were pending should also be ready. As an example, let's say that we have two pendingNotified
futures:fut1
andfut2
. If we callnotify_waiters
in parallel with the following, the assertion can fail:notify_one
calls. It is possible that callingnotify_one
after awaiting a future notified bynotify_waiters
will result in the notification being lost. As an example, let's say that we have a pendingnotify.notified()
futurefut
. If we callnotify_waiters
in parallel with the following code, it can hang forever:For more context, see the discussion in #5404 or in the related Discord thread.
Closes: #5396
Solution
The idea is to move all waiters to a secondary list, which is created in each call to
notify_waiters
. This list will be unavailable to modify fromnotify_one
and other parts of the code, so the waiters contained in it will not consume other notifications, and it will be impossible to insert new waiters into it. This approach eliminates all presented issues.The tricky part is how to allow waiters contained by a secondary list to remove themselves from it. It is possible to drop a waiter still contained by a secondary list, and in such case it should be able to unlink itself from the neighboring nodes from the list. To solve it, this PR adds a new variant of a linked list, which allows removing a node from it without knowing the list head's memory address. This is done by adding a special “guard” node, which makes the list circular.
To solve the issue with inconsistency in results from
poll
, this PR also adds some additional logic topoll_notified
. A polled future has to know whether it is already included by a secondary list innotify_waiters
and is staged to be notified. In such case it also has to remove itself from the list. To determine if it is contained by a secondary list, a future uses the counter ofnotify_waiters
calls. If the current value of this counter is different from the value recorded by a future at the time, when it was being added to the queue, then there is an ongoingnotify_waiters
call. In such case the future removes itself from the secondary list.