Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broadcast: Stop notifying after we've woken all wakers #5925

Merged
merged 1 commit into from Aug 14, 2023
Merged

broadcast: Stop notifying after we've woken all wakers #5925

merged 1 commit into from Aug 14, 2023

Conversation

glittershark
Copy link
Contributor

Motivation

Fix #5923

Solution

Unboundedly looping within notify_rx as long as there are still available wakers causes a quadratic slowdown as receivers which are looping receiving from the channel are added. Instead of continually waiting for new wakers, this commit modifies notify_rx to stop trying to wake wakers once we've notified a number of wakers greater than or equal to whatever the number of active wakers was when we started notifying.

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Aug 10, 2023
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!

tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Aug 10, 2023
@Darksonn
Copy link
Contributor

Darksonn commented Aug 11, 2023

Hmm, there's an alternate approach: Move everything in the list into a separate list immediately, then empty the separate list. I think that I would prefer that approach because it is simpler to verify the correctness (no need to reason about the maximum length). Additionally, we already do this elsewhere, so it makes us more consistent across the codebase.

You can find an existing implementation of this here:

pub fn notify_waiters(&self) {
let mut waiters = self.waiters.lock();
// The state must be loaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
let curr = self.state.load(SeqCst);
if matches!(get_state(curr), EMPTY | NOTIFIED) {
// There are no waiting tasks. All we need to do is increment the
// number of times this method was called.
atomic_inc_num_notify_waiters_calls(&self.state);
return;
}
// Increment the number of times this method was called
// and transition to empty.
let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
self.state.store(new_state, SeqCst);
// It is critical for `GuardedLinkedList` safety that the guard node is
// pinned in memory and is not dropped until the guarded list is dropped.
let guard = Waiter::new();
pin!(guard);
// We move all waiters to a secondary list. It uses a `GuardedLinkedList`
// underneath to allow every waiter to safely remove itself from it.
//
// * This list will be still guarded by the `waiters` lock.
// `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
// * This wrapper will empty the list on drop. It is critical for safety
// that we will not leave any list entry with a pointer to the local
// guard node after this function returns / panics.
let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
let mut wakers = WakeList::new();
'outer: loop {
while wakers.can_push() {
match list.pop_back_locked(&mut waiters) {
Some(waiter) => {
// Safety: we never make mutable references to waiters.
let waiter = unsafe { waiter.as_ref() };
// Safety: we hold the lock, so we can access the waker.
if let Some(waker) =
unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
{
wakers.push(waker);
}
// This waiter is unlinked and will not be shared ever again, release it.
waiter.notification.store_release(Notification::All);
}
None => {
break 'outer;
}
}
}
// Release the lock before notifying.
drop(waiters);
// One of the wakers may panic, but the remaining waiters will still
// be unlinked from the list in `NotifyWaitersList` destructor.
wakers.wake_all();
// Acquire the lock again.
waiters = self.waiters.lock();
}
// Release the lock before notifying
drop(waiters);
wakers.wake_all();
}

Would you be up for doing that?

@glittershark
Copy link
Contributor Author

@Darksonn done, I think

Within `notify_rx`, looping while re-locking and re-reading from
`Shared.tail` as long as there are still available wakers causes a
quadratic slowdown as receivers which are looping receiving from the
channel are added. Instead of continually re-reading from the original
list, this commit modifies `notify_rx` to move the waiters into a
separate list immediately similar to how `Notify::notify_waiters` works,
using a new `WaitersList` struct modified after NotifyWaitersList.

Fixes #5923
@Darksonn
Copy link
Contributor

Looks reasonable to me. Does this version also fix the performance issue?

@wathenjiang
Copy link
Contributor

wathenjiang commented Aug 14, 2023

I believe the scope of lock condition can be reduced. The method Shared.notify_rx could be written in the following way:

impl<T> Shared<T> {
    fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
        // It is critical for `GuardedLinkedList` safety that the guard node is
        // pinned in memory and is not dropped until the guarded list is dropped.
        let guard = Waiter::new();
        pin!(guard);

        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
        // underneath to allow every waiter to safely remove itself from it.
        //
        // * This list will be still guarded by the `waiters` lock.
        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
        // * This wrapper will empty the list on drop. It is critical for safety
        //   that we will not leave any list entry with a pointer to the local
        //   guard node after this function returns / panics.
        let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);

        let mut wakers = WakeList::new();
        'outer: loop {
            // Because all waiter in tail.waiters has been moved to the secondary list, so we can drop the lock here.
            drop(tail);
            while wakers.can_push() {
                match list.pop_back() {
                    Some(mut waiter) => {
                        // Safety: `tail` lock is still held.
                        let waiter = unsafe { waiter.as_mut() };

                        assert!(waiter.queued);
                        waiter.queued = false;

                        if let Some(waker) = waiter.waker.take() {
                            wakers.push(waker);
                        }
                    }
                    None => {
                        break 'outer;
                    }
                }
            }

            // Before we acquire the lock again all sorts of things can happen:
            // some waiters may remove themselves from the list and new waiters
            // may be added. This is fine since at worst we will unnecessarily
            // wake up waiters which will then queue themselves again.

            wakers.wake_all();

            // Acquire the lock again.
            tail = self.tail.lock();
        }

        wakers.wake_all();
    }
}
diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs
index 42cde81d..f46e5a20 100644
--- a/tokio/src/sync/broadcast.rs
+++ b/tokio/src/sync/broadcast.rs
@@ -865,7 +865,7 @@ impl<'a, T> WaitersList<'a, T> {
 
     /// Removes the last element from the guarded list. Modifying this list
     /// requires an exclusive access to the main list in `Notify`.
-    fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
+    fn pop_back(&mut self) -> Option<NonNull<Waiter>> {
         let result = self.list.pop_back();
         if result.is_none() {
             // Save information about emptiness to avoid waiting for lock
@@ -895,8 +895,12 @@ impl<T> Shared<T> {
 
         let mut wakers = WakeList::new();
         'outer: loop {
+
+            // Release the lock before waking.
+            drop(tail);
+
             while wakers.can_push() {
-                match list.pop_back_locked(&mut tail) {
+                match list.pop_back() {
                     Some(mut waiter) => {
                         // Safety: `tail` lock is still held.
                         let waiter = unsafe { waiter.as_mut() };
@@ -914,10 +918,7 @@ impl<T> Shared<T> {
                 }
             }
 
-            // Release the lock before waking.
-            drop(tail);
-
-            // Before we acquire the lock again all sorts of things can happen:
+                     // Before we acquire the lock again all sorts of things can happen:
             // some waiters may remove themselves from the list and new waiters
             // may be added. This is fine since at worst we will unnecessarily
             // wake up waiters which will then queue themselves again.
@@ -928,9 +929,6 @@ impl<T> Shared<T> {
             tail = self.tail.lock();
         }
 
-        // Release the lock before waking.
-        drop(tail);
-
         wakers.wake_all();
     }
 }
@@ -1512,3 +1510,4 @@ mod tests {
         assert_eq!(sender.receiver_count(), 2);
     }
 }
+

@Darksonn
Copy link
Contributor

No, we can't do that. it is important that we hold the lock when calling pop_back. Otherwise, the destructor of the waiter could remove it from the linked list in parallel with the call to pop_back, which would be a data race. Since this is unsafe code, it would not be caught by the compiler, but it is still wrong.

@wathenjiang
Copy link
Contributor

wathenjiang commented Aug 14, 2023

No, we can't do that. it is important that we hold the lock when calling pop_back. Otherwise, the destructor of the waiter could remove it from the linked list in parallel with the call to pop_back, which would be a data race. Since this is unsafe code, it would not be caught by the compiler, but it is still wrong.

You are right, the lock should be held while doing the loop. The above code may cause data race.

@glittershark
Copy link
Contributor Author

Looks reasonable to me. Does this version also fix the performance issue?

yep!

debug

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@Darksonn Darksonn merged commit 3dd5f7a into tokio-rs:master Aug 14, 2023
72 checks passed
@glittershark glittershark deleted the broadcast-stop-notifying branch August 14, 2023 21:14
readysetbot pushed a commit to readysettech/readyset that referenced this pull request Aug 24, 2023
Upgrade the version of tokio we depend on to version 1.32, to get the
version with tokio-rs/tokio#5925, my fix for a
performance issue in `tokio::sync::broadcast`. We use this to notify
workers when channels are removed from the channel coordinator, so we
want this fix to improve the performance of that process.

Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983
readysetbot pushed a commit to readysettech/readyset that referenced this pull request Aug 24, 2023
Upgrade the version of tokio we depend on to version 1.32, to get the
version with tokio-rs/tokio#5925, my fix for a
performance issue in `tokio::sync::broadcast`. We use this to notify
workers when channels are removed from the channel coordinator, so we
want this fix to improve the performance of that process.

Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983
readysetbot pushed a commit to readysettech/readyset that referenced this pull request Aug 24, 2023
Upgrade the version of tokio we depend on to version 1.32, to get the
version with tokio-rs/tokio#5925, my fix for a
performance issue in `tokio::sync::broadcast`. We use this to notify
workers when channels are removed from the channel coordinator, so we
want this fix to improve the performance of that process.

Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983
readysetbot pushed a commit to readysettech/readyset that referenced this pull request Aug 24, 2023
Upgrade the version of tokio we depend on to version 1.32, to get the
version with tokio-rs/tokio#5925, my fix for a
performance issue in `tokio::sync::broadcast`. We use this to notify
workers when channels are removed from the channel coordinator, so we
want this fix to improve the performance of that process.

Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983
readysetbot pushed a commit to readysettech/proptest-stateful that referenced this pull request Aug 24, 2023
Upgrade the version of tokio we depend on to version 1.32, to get the
version with tokio-rs/tokio#5925, my fix for a
performance issue in `tokio::sync::broadcast`. We use this to notify
workers when channels are removed from the channel coordinator, so we
want this fix to improve the performance of that process.

Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5883
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
readysetbot pushed a commit to readysettech/readyset that referenced this pull request Aug 24, 2023
Upgrade the version of tokio we depend on to version 1.32, to get the
version with tokio-rs/tokio#5925, my fix for a
performance issue in `tokio::sync::broadcast`. We use this to notify
workers when channels are removed from the channel coordinator, so we
want this fix to improve the performance of that process.

Change-Id: Id45707e0f95ca0a76ea69952ea23e8c65f846983
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5883
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR
Projects
None yet
3 participants