Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time: lazy init TimerShared in TimerEntry #6512

Merged
merged 25 commits into from May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bde0742
This commit is part of reducing timeout performance overhead.
wathenjiang Apr 23, 2024
0eb1d3e
feat: panic if the time driver is not enabled
wathenjiang Apr 23, 2024
a4e1231
rustfmt
wathenjiang Apr 23, 2024
cb54a1e
add use crate::runtime::scheduler
wathenjiang Apr 24, 2024
f8874b2
add #[track_caller] for new_with_delay
wathenjiang Apr 24, 2024
9dd1e33
update comments
wathenjiang Apr 24, 2024
2d3efb6
make TimerEntry lazy init
wathenjiang Apr 24, 2024
1ae3de2
feat: add single_thread_sleep and multi_thread_sleep-8
wathenjiang Apr 24, 2024
ef3630b
feat: use new_current_thread
wathenjiang Apr 24, 2024
d4a95a3
Merge branch 'master' into timeout-lazy-init-sleep
wathenjiang Apr 24, 2024
f6fa09b
fix: new me variable
wathenjiang Apr 24, 2024
b1fa990
fix: create me variable in the block
wathenjiang Apr 24, 2024
8c0c2e8
fix: ci
wathenjiang Apr 24, 2024
a9a11e9
fix: ownership
wathenjiang Apr 24, 2024
ef657d2
fix: fmt
wathenjiang Apr 24, 2024
a446e44
fix: lifetime issue
wathenjiang Apr 24, 2024
3a89c45
rebase
wathenjiang Apr 25, 2024
c47b739
rm unnecessary code
wathenjiang Apr 25, 2024
8e40ec4
add is_inner_init
wathenjiang Apr 25, 2024
d328298
Merge branch 'master' into timeout-lazy-init-sleep
wathenjiang Apr 26, 2024
07953b5
adopt code review suggestions from mox692
wathenjiang May 2, 2024
373044d
get mutable ref only if the inner is none
wathenjiang May 2, 2024
0129c92
fix ci
wathenjiang May 2, 2024
b492723
fix typo
wathenjiang May 2, 2024
8af2a97
Merge branch 'master' into timeout-lazy-init-sleep
Darksonn May 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions benches/Cargo.toml
Expand Up @@ -90,3 +90,8 @@ harness = false
name = "time_now"
path = "time_now.rs"
harness = false

[[bench]]
name = "time_timeout"
path = "time_timeout.rs"
harness = false
wathenjiang marked this conversation as resolved.
Show resolved Hide resolved
62 changes: 62 additions & 0 deletions benches/time_timeout.rs
@@ -0,0 +1,62 @@
//! Benchmark spawning a task onto the basic and threaded Tokio executors.
//! This essentially measure the time to enqueue a task in the local and remote
//! case.
wathenjiang marked this conversation as resolved.
Show resolved Hide resolved

use std::time::{Duration, Instant};

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tokio::time::timeout;

// a vevry quick async task, but might timeout
wathenjiang marked this conversation as resolved.
Show resolved Hide resolved
async fn quick_job() -> usize {
1
}

fn single_thread_scheduler_timeout(c: &mut Criterion) {
do_test(c, 1, "single_thread_timeout");
}
wathenjiang marked this conversation as resolved.
Show resolved Hide resolved

fn multi_thread_scheduler_timeout(c: &mut Criterion) {
do_test(c, 8, "multi_thread_timeout-8");
}

fn do_test(c: &mut Criterion, workers: usize, name: &str) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(workers)
.build()
.unwrap();

c.bench_function(name, |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(spawn_job(iters as usize, workers).await);
});
start.elapsed()
})
});
}

async fn spawn_job(iters: usize, procs: usize) {
let mut handles = Vec::with_capacity(procs);
for _ in 0..procs {
handles.push(tokio::spawn(async move {
for _ in 0..iters / procs {
let h = timeout(Duration::from_secs(1), quick_job());
assert_eq!(black_box(h.await.unwrap()), 1);
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}

criterion_group!(
timeout_benchmark,
single_thread_scheduler_timeout,
multi_thread_scheduler_timeout,
);

criterion_main!(timeout_benchmark);
10 changes: 10 additions & 0 deletions tokio/src/time/sleep.rs
Expand Up @@ -256,6 +256,16 @@ impl Sleep {
use crate::runtime::scheduler;

let handle = scheduler::Handle::current();
Self::new_timeout_with_handle(deadline, location, handle)
}

#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
#[track_caller]
pub(crate) fn new_timeout_with_handle(
deadline: Instant,
location: Option<&'static Location<'static>>,
handle: crate::runtime::scheduler::Handle,
) -> Sleep {
let entry = TimerEntry::new(&handle, deadline);

#[cfg(all(tokio_unstable, feature = "tracing"))]
Expand Down
57 changes: 41 additions & 16 deletions tokio/src/time/timeout.rs
Expand Up @@ -6,7 +6,8 @@

use crate::{
runtime::coop,
time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
runtime::scheduler,
time::{error::Elapsed, Duration, Instant, Sleep},
util::trace,
};

Expand Down Expand Up @@ -87,14 +88,7 @@ pub fn timeout<F>(duration: Duration, future: F) -> Timeout<F>
where
F: Future,
{
let location = trace::caller_location();

let deadline = Instant::now().checked_add(duration);
let delay = match deadline {
Some(deadline) => Sleep::new_timeout(deadline, location),
None => Sleep::far_future(location),
};
Timeout::new_with_delay(future, delay)
Timeout::new_with_delay(future, Instant::now().checked_add(duration))
}

/// Requires a `Future` to complete before the specified instant in time.
Expand Down Expand Up @@ -146,11 +140,15 @@ pub fn timeout_at<F>(deadline: Instant, future: F) -> Timeout<F>
where
F: Future,
{
let delay = sleep_until(deadline);
let handle = scheduler::Handle::current();
// Panic if the time driver is not enabled.
let _ = handle.driver().time();

Timeout {
value: future,
delay,
deadline: Some(deadline),
delay: None,
handle,
}
}

Expand All @@ -162,13 +160,25 @@ pin_project! {
#[pin]
value: T,
#[pin]
delay: Sleep,
delay: Option<Sleep>,
deadline : Option<Instant>,
handle: scheduler::Handle,
}
}

impl<T> Timeout<T> {
pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> {
Timeout { value, delay }
#[track_caller]
pub(crate) fn new_with_delay(value: T, deadline: Option<Instant>) -> Timeout<T> {
let handle = scheduler::Handle::current();
// Panic if the time driver is not enabled.
let _ = handle.driver().time();

Timeout {
value,
deadline,
delay: None,
handle,
}
}

/// Gets a reference to the underlying value in this timeout.
Expand All @@ -194,7 +204,7 @@ where
type Output = Result<T::Output, Elapsed>;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let mut me = self.project();

let had_budget_before = coop::has_budget_remaining();

Expand All @@ -205,10 +215,25 @@ where

let has_budget_now = coop::has_budget_remaining();

// If the above inner future is ready, the below code will not be executed.
// This lazy initiation is for performance purposes,
// it can avoid the unnecessary creation and drop of `Sleep`.
if me.delay.is_none() {
let location = trace::caller_location();
let delay = match me.deadline {
Some(deadline) => {
Sleep::new_timeout_with_handle(*deadline, location, me.handle.clone())
}
None => Sleep::far_future(location),
};
me.delay.as_mut().set(Some(delay));
}

let delay = me.delay;

let poll_delay = || -> Poll<Self::Output> {
match delay.poll(cx) {
// Safety: we have just assigned it a value of `Some`.
match delay.as_pin_mut().unwrap().poll(cx) {
Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())),
Poll::Pending => Poll::Pending,
}
Expand Down