Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce contention in watch channel #5464

Merged
merged 10 commits into from
Feb 17, 2023
2 changes: 1 addition & 1 deletion tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ tokio_thread_local! {
}
}

#[cfg(feature = "macros")]
#[cfg(any(feature = "macros", all(feature = "sync", feature = "rt")))]
pub(crate) fn thread_rng_n(n: u32) -> u32 {
CONTEXT.with(|ctx| ctx.rng.fastrand_n(n))
}
Expand Down
74 changes: 71 additions & 3 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::sync::notify::Notify;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
use std::fmt;
use std::mem;
use std::ops;
use std::panic;
Expand Down Expand Up @@ -166,7 +167,6 @@ impl<'a, T> Ref<'a, T> {
}
}

#[derive(Debug)]
struct Shared<T> {
/// The most recent value.
value: RwLock<T>,
Expand All @@ -181,12 +181,24 @@ struct Shared<T> {
ref_count_rx: AtomicUsize,

/// Notifies waiting receivers that the value changed.
notify_rx: Notify,
notify_rx: big_notify::BigNotify,

/// Notifies any task listening for `Receiver` dropped events.
notify_tx: Notify,
}

impl<T: fmt::Debug> fmt::Debug for Shared<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load();
f.debug_struct("Shared")
.field("value", &self.value)
.field("version", &state.version())
.field("is_closed", &state.is_closed())
.field("ref_count_rx", &self.ref_count_rx)
.finish()
}
}

pub mod error {
//! Watch error types.

Expand Down Expand Up @@ -221,6 +233,62 @@ pub mod error {
impl std::error::Error for RecvError {}
}

mod big_notify {
use super::*;
use crate::sync::notify::Notified;

// To avoid contention on the lock inside the `Notify`, we store multiple
// copies of it. Then, we use either circular access or randomness to spread
// out threads over different `Notify` objects.
//
// Some simple benchmarks show that randomness performs slightly better than
// circular access (probably due to contention on `next`), so we prefer to
// use randomness when Tokio is compiled with a random number generator.
//
// When the random number generator is not available, we fall back to
// circular access.

pub(super) struct BigNotify {
#[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
next: AtomicUsize,
inner: [Notify; 8],
}

impl BigNotify {
pub(super) fn new() -> Self {
Self {
#[cfg(not(all(
not(loom),
feature = "sync",
any(feature = "rt", feature = "macros")
)))]
next: AtomicUsize::new(0),
inner: Default::default(),
}
}

pub(super) fn notify_waiters(&self) {
for notify in &self.inner {
notify.notify_waiters();
}
}

/// This function implements the case where randomness is not available.
#[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
pub(super) fn notified(&self) -> Notified<'_> {
let i = self.next.fetch_add(1, Relaxed) % 8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use TaskID here or something similar? Not a big deal either way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use task or thread ids here, because they require the context thread-local to be available, which it isn't when we use the fallback.

self.inner[i].notified()
}

/// This function implements the case where randomness is available.
#[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
pub(super) fn notified(&self) -> Notified<'_> {
let i = crate::runtime::context::thread_rng_n(8) as usize;
self.inner[i].notified()
}
}
}

use self::state::{AtomicState, Version};
mod state {
use crate::loom::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -320,7 +388,7 @@ pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
value: RwLock::new(init),
state: AtomicState::new(),
ref_count_rx: AtomicUsize::new(1),
notify_rx: Notify::new(),
notify_rx: big_notify::BigNotify::new(),
notify_tx: Notify::new(),
});

Expand Down
6 changes: 5 additions & 1 deletion tokio/src/util/rand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ impl FastRand {
old_seed
}

#[cfg(any(feature = "macros", feature = "rt-multi-thread"))]
#[cfg(any(
feature = "macros",
feature = "rt-multi-thread",
all(feature = "sync", feature = "rt")
))]
pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
// This is similar to fastrand() % n, but faster.
// See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
Expand Down