Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time: lazy init TimerShared in TimerEntry #6512

Merged
merged 25 commits into from May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bde0742
This commit is part of reducing timeout performance overhead.
wathenjiang Apr 23, 2024
0eb1d3e
feat: panic if the time driver is not enabled
wathenjiang Apr 23, 2024
a4e1231
rustfmt
wathenjiang Apr 23, 2024
cb54a1e
add use crate::runtime::scheduler
wathenjiang Apr 24, 2024
f8874b2
add #[track_caller] for new_with_delay
wathenjiang Apr 24, 2024
9dd1e33
update comments
wathenjiang Apr 24, 2024
2d3efb6
make TimerEntry lazy init
wathenjiang Apr 24, 2024
1ae3de2
feat: add single_thread_sleep and multi_thread_sleep-8
wathenjiang Apr 24, 2024
ef3630b
feat: use new_current_thread
wathenjiang Apr 24, 2024
d4a95a3
Merge branch 'master' into timeout-lazy-init-sleep
wathenjiang Apr 24, 2024
f6fa09b
fix: new me variable
wathenjiang Apr 24, 2024
b1fa990
fix: create me variable in the block
wathenjiang Apr 24, 2024
8c0c2e8
fix: ci
wathenjiang Apr 24, 2024
a9a11e9
fix: ownership
wathenjiang Apr 24, 2024
ef657d2
fix: fmt
wathenjiang Apr 24, 2024
a446e44
fix: lifetime issue
wathenjiang Apr 24, 2024
3a89c45
rebase
wathenjiang Apr 25, 2024
c47b739
rm unnecessary code
wathenjiang Apr 25, 2024
8e40ec4
add is_inner_init
wathenjiang Apr 25, 2024
d328298
Merge branch 'master' into timeout-lazy-init-sleep
wathenjiang Apr 26, 2024
07953b5
adopt code review suggestions from mox692
wathenjiang May 2, 2024
373044d
get mutable ref only if the inner is none
wathenjiang May 2, 2024
0129c92
fix ci
wathenjiang May 2, 2024
b492723
fix typo
wathenjiang May 2, 2024
8af2a97
Merge branch 'master' into timeout-lazy-init-sleep
Darksonn May 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions benches/Cargo.toml
Expand Up @@ -90,3 +90,8 @@ harness = false
name = "time_now"
path = "time_now.rs"
harness = false

[[bench]]
name = "time_timeout"
path = "time_timeout.rs"
harness = false
114 changes: 114 additions & 0 deletions benches/time_timeout.rs
@@ -0,0 +1,114 @@
//! Benchmark spawning a task onto the basic and threaded Tokio executors.
//! This essentially measure the time to enqueue a task in the local and remote
//! case.
wathenjiang marked this conversation as resolved.
Show resolved Hide resolved

use std::time::{Duration, Instant};

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tokio::{
runtime::Runtime,
time::{sleep, timeout},
};

// a vevry quick async task, but might timeout
wathenjiang marked this conversation as resolved.
Show resolved Hide resolved
async fn quick_job() -> usize {
1
}

fn build_run_time(workers: usize) -> Runtime {
if workers == 1 {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.worker_threads(workers)
wathenjiang marked this conversation as resolved.
Show resolved Hide resolved
.build()
.unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(workers)
.build()
.unwrap()
}
}

fn single_thread_scheduler_timeout(c: &mut Criterion) {
do_timeout_test(c, 1, "single_thread_timeout");
}

fn multi_thread_scheduler_timeout(c: &mut Criterion) {
do_timeout_test(c, 8, "multi_thread_timeout-8");
}

fn do_timeout_test(c: &mut Criterion, workers: usize, name: &str) {
let runtime = build_run_time(workers);
c.bench_function(name, |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(spawn_timeout_job(iters as usize, workers).await);
});
start.elapsed()
})
});
}

async fn spawn_timeout_job(iters: usize, procs: usize) {
let mut handles = Vec::with_capacity(procs);
for _ in 0..procs {
handles.push(tokio::spawn(async move {
for _ in 0..iters / procs {
let h = timeout(Duration::from_secs(1), quick_job());
assert_eq!(black_box(h.await.unwrap()), 1);
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}

fn single_thread_scheduler_sleep(c: &mut Criterion) {
do_sleep_test(c, 1, "single_thread_sleep");
}

fn multi_thread_scheduler_sleep(c: &mut Criterion) {
do_sleep_test(c, 8, "multi_thread_sleep-8");
}

fn do_sleep_test(c: &mut Criterion, workers: usize, name: &str) {
let runtime = build_run_time(workers);

c.bench_function(name, |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(spawn_sleep_job(iters as usize, workers).await);
});
start.elapsed()
})
});
}

async fn spawn_sleep_job(iters: usize, procs: usize) {
let mut handles = Vec::with_capacity(procs);
for _ in 0..procs {
handles.push(tokio::spawn(async move {
for _ in 0..iters / procs {
let _h = black_box(sleep(Duration::from_secs(1)));
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}

criterion_group!(
timeout_benchmark,
single_thread_scheduler_timeout,
multi_thread_scheduler_timeout,
single_thread_scheduler_sleep,
multi_thread_scheduler_sleep
);

criterion_main!(timeout_benchmark);
4 changes: 0 additions & 4 deletions tokio/src/runtime/time/entry.rs
Expand Up @@ -496,10 +496,6 @@ impl TimerEntry {
unsafe { &*self.inner.get() }
}

pub(crate) fn deadline(&self) -> Instant {
self.deadline
}

pub(crate) fn is_elapsed(&self) -> bool {
!self.inner().state.might_be_registered() && self.registered
}
Expand Down
104 changes: 78 additions & 26 deletions tokio/src/time/sleep.rs
@@ -1,3 +1,4 @@
use crate::runtime::scheduler;
use crate::runtime::time::TimerEntry;
use crate::time::{error::Error, Duration, Instant};
use crate::util::trace;
Expand Down Expand Up @@ -226,10 +227,11 @@ pin_project! {
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Sleep {
inner: Inner,

deadline : Instant,
handle: scheduler::Handle,
// The link between the `Sleep` instance and the timer that drives it.
#[pin]
entry: TimerEntry,
entry: Option<TimerEntry>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this change the size of Sleep? Could TimerEntry be changed to reduce the change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be possible. Your idea looks better, I'll give it a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put deadline and handle in Sleep here for do not init the TimerEntry, but I have to put them to a place to save them temporarily.

Saving handle here is to ensure the timeout will panic if there is no runtime handle in TLS or we do not enable_time or enable_all.

The size of Sleep is an issue worth considering. Is there any way to solve this problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you just don't call clear_entry in this method when it hasn't yet been registered?

Based on @conradludgate's comment here, it sounds like this triggers a loom failure, but that's most likely due to some path where the timer is dropped concurrently with firing or something like that. If we've never been registered with the driver, then not calling clear_entry should be okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my test, it can improve the performance by reduicng lock contention when we always let timeout register into the driver. But, on the other hand, if the timeout never register into the driver, this can not bring significant performance improvement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, maybe we can use inner: StdUnsafeCell<Option<TimerShared>> for this purpose, the size of Sleep should be barely changed.

Avoiding clear_entry for unregistered timeouts is not necessary. In this case, we should avoid constructing the intrusive linked list item.

}

Expand All @@ -256,8 +258,16 @@ impl Sleep {
use crate::runtime::scheduler;

let handle = scheduler::Handle::current();
let entry = TimerEntry::new(&handle, deadline);
Self::new_timeout_with_handle(deadline, location, handle)
}

#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
#[track_caller]
pub(crate) fn new_timeout_with_handle(
deadline: Instant,
location: Option<&'static Location<'static>>,
handle: crate::runtime::scheduler::Handle,
) -> Sleep {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let clock = handle.driver().clock();
Expand Down Expand Up @@ -302,23 +312,27 @@ impl Sleep {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let inner = Inner {};

Sleep { inner, entry }
}

pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {
Self::new_timeout(Instant::far_future(), location)
Sleep {
inner,
deadline,
handle,
entry: None,
}
}

/// Returns the instant at which the future will complete.
pub fn deadline(&self) -> Instant {
self.entry.deadline()
self.deadline
}

/// Returns `true` if `Sleep` has elapsed.
///
/// A `Sleep` instance is elapsed when the requested duration has elapsed.
pub fn is_elapsed(&self) -> bool {
self.entry.is_elapsed()
if self.entry.is_none() {
return false;
}
self.entry.as_ref().unwrap().is_elapsed()
}

/// Resets the `Sleep` instance to a new deadline.
Expand Down Expand Up @@ -362,17 +376,27 @@ impl Sleep {
/// without having it registered. This is required in e.g. the
/// [`crate::time::Interval`] where we want to reset the internal [Sleep]
/// without having it wake up the last task that polled it.
pub(crate) fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) {
let mut me = self.project();
me.entry.as_mut().reset(deadline, false);
pub(crate) fn reset_without_reregister(mut self: Pin<&mut Self>, deadline: Instant) {
self.as_mut().lazy_init_timer_entry(deadline);
self.project()
.entry
.as_pin_mut()
.unwrap()
.reset(deadline, false);
}

fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let mut me = self.project();
me.entry.as_mut().reset(deadline, true);
fn reset_inner(mut self: Pin<&mut Self>, deadline: Instant) {
self.as_mut().lazy_init_timer_entry(deadline);
self.as_mut()
.project()
.entry
.as_pin_mut()
.unwrap()
.reset(deadline, true);

#[cfg(all(tokio_unstable, feature = "tracing"))]
{
let me = self.as_mut().project();
let _resource_enter = me.inner.ctx.resource_span.enter();
me.inner.ctx.async_op_span =
tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset");
Expand All @@ -382,8 +406,15 @@ impl Sleep {
tracing::trace_span!("runtime.resource.async_op.poll");

let duration = {
let clock = me.entry.clock();
let time_source = me.entry.driver().time_source();
let clock = self.as_mut().project().entry.as_pin_mut().unwrap().clock();
let time_source = self
.as_mut()
.project()
.entry
.as_pin_mut()
.unwrap()
.driver()
.time_source();
let now = time_source.now(clock);
let deadline_tick = time_source.deadline_to_tick(deadline);
deadline_tick.saturating_sub(now)
Expand All @@ -398,10 +429,15 @@ impl Sleep {
}
}

fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();

fn poll_elapsed(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Error>> {
ready!(crate::trace::trace_leaf(cx));
let deadline = self.deadline;
self.as_mut().lazy_init_timer_entry(deadline);

let me = self.project();

// Keep track of task budget
#[cfg(all(tokio_unstable, feature = "tracing"))]
Expand All @@ -412,18 +448,34 @@ impl Sleep {

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let result = me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
});
// Safety: we have just assigned it a value of `Some`.
let result = me
.entry
.as_pin_mut()
.unwrap()
.poll_elapsed(cx)
.map(move |r| {
coop.made_progress();
r
});

#[cfg(all(tokio_unstable, feature = "tracing"))]
return trace_poll_op!("poll_elapsed", result);

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
return result;
}

// This lazy initiation is for performance purposes,
// it can avoid the unnecessary creation and drop of `TimerEntry`.
fn lazy_init_timer_entry(self: Pin<&mut Self>, deadline: Instant) {
let mut me = self.project();
*me.deadline = deadline;
if me.entry.is_none() {
let entry = TimerEntry::new(me.handle, deadline);
me.entry.as_mut().set(Some(entry));
}
}
}
wathenjiang marked this conversation as resolved.
Show resolved Hide resolved

impl Future for Sleep {
Expand Down
32 changes: 21 additions & 11 deletions tokio/src/time/timeout.rs
Expand Up @@ -6,7 +6,8 @@

use crate::{
runtime::coop,
time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
runtime::scheduler,
time::{error::Elapsed, Duration, Instant, Sleep},
util::trace,
};

Expand Down Expand Up @@ -87,14 +88,7 @@ pub fn timeout<F>(duration: Duration, future: F) -> Timeout<F>
where
F: Future,
{
let location = trace::caller_location();

let deadline = Instant::now().checked_add(duration);
let delay = match deadline {
Some(deadline) => Sleep::new_timeout(deadline, location),
None => Sleep::far_future(location),
};
Timeout::new_with_delay(future, delay)
Timeout::new_with_delay(future, Instant::now().checked_add(duration))
}

/// Requires a `Future` to complete before the specified instant in time.
Expand Down Expand Up @@ -146,7 +140,11 @@ pub fn timeout_at<F>(deadline: Instant, future: F) -> Timeout<F>
where
F: Future,
{
let delay = sleep_until(deadline);
let handle = scheduler::Handle::current();
// Panic if the time driver is not enabled.
let _ = handle.driver().time();

let delay = Sleep::new_timeout_with_handle(deadline, trace::caller_location(), handle);

Timeout {
value: future,
Expand All @@ -167,7 +165,19 @@ pin_project! {
}

impl<T> Timeout<T> {
pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> {
#[track_caller]
pub(crate) fn new_with_delay(value: T, deadline: Option<Instant>) -> Timeout<T> {
let handle = scheduler::Handle::current();
// Panic if the time driver is not enabled.
let _ = handle.driver().time();

let deadline = match deadline {
Some(deadline) => deadline,
None => Instant::far_future(),
};

let delay = Sleep::new_timeout_with_handle(deadline, trace::caller_location(), handle);

Timeout { value, delay }
}

Expand Down