Skip to content

Commit

Permalink
This commit is part of reducing timeout performance overhead.
Browse files Browse the repository at this point in the history
See tokio-rs#6504

Below are relevant benchmark results for various GOMAXPROCS values
on m1/:

Below are relevant benchmark results of the current version:
single_thread_timeout   time:   [21.869 ns 21.987 ns 22.135 ns]
                        change: [-3.4429% -2.0709% -0.8759%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) high mild
  4 (4.00%) high severe

multi_thread_timeout-8  time:   [4.4835 ns 4.6138 ns 4.7614 ns]
                        change: [-4.3554% +0.1643% +4.5114%] (p = 0.95 > 0.05)
                        No change in performance detected.
Found 9 outliers among 100 measurements (9.00%)
  8 (8.00%) high mild
  1 (1.00%) high severe

Below are relevant benchmark results of this version:

single_thread_timeout   time:   [40.227 ns 40.416 ns 40.691 ns]
                        change: [+81.321% +82.817% +84.121%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 14 outliers among 100 measurements (14.00%)
  3 (3.00%) high mild
  11 (11.00%) high severe

multi_thread_timeout-8  time:   [183.16 ns 186.02 ns 188.21 ns]
                        change: [+3765.0% +3880.4% +3987.4%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 10 outliers among 100 measurements (10.00%)
  4 (4.00%) low severe
  6 (6.00%) low mild
  • Loading branch information
wathenjiang committed Apr 23, 2024
1 parent a73d6bf commit f938c34
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 17 deletions.
5 changes: 5 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,8 @@ harness = false
name = "time_now"
path = "time_now.rs"
harness = false

[[bench]]
name = "time_timeout"
path = "time_timeout.rs"
harness = false
62 changes: 62 additions & 0 deletions benches/time_timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//! Benchmark spawning a task onto the basic and threaded Tokio executors.
//! This essentially measure the time to enqueue a task in the local and remote
//! case.

use std::time::{Duration, Instant};

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tokio::time::timeout;

// a vevry quick async task, but might timeout
async fn quick_job() -> usize {
1
}

fn single_thread_scheduler_timeout(c: &mut Criterion) {
do_test(c, 1, "single_thread_timeout");
}

fn multi_thread_scheduler_timeout(c: &mut Criterion) {
do_test(c, 8, "multi_thread_timeout-8");
}

fn do_test(c: &mut Criterion, workers: usize, name: &str) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(workers)
.build()
.unwrap();

c.bench_function(name, |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(spawn_job(iters as usize, workers).await);
});
start.elapsed()
})
});
}

async fn spawn_job(iters: usize, procs: usize) {
let mut handles = Vec::with_capacity(procs);
for _ in 0..procs {
handles.push(tokio::spawn(async move {
for _ in 0..iters / procs {
let h = timeout(Duration::from_secs(1), quick_job());
assert_eq!(black_box(h.await.unwrap()), 1);
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}

criterion_group!(
timeout_benchmark,
single_thread_scheduler_timeout,
multi_thread_scheduler_timeout,
);

criterion_main!(timeout_benchmark);
44 changes: 27 additions & 17 deletions tokio/src/time/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use crate::{
runtime::coop,
time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
time::{error::Elapsed, Duration, Instant, Sleep},
util::trace,
};

Expand Down Expand Up @@ -87,14 +87,7 @@ pub fn timeout<F>(duration: Duration, future: F) -> Timeout<F>
where
F: Future,
{
let location = trace::caller_location();

let deadline = Instant::now().checked_add(duration);
let delay = match deadline {
Some(deadline) => Sleep::new_timeout(deadline, location),
None => Sleep::far_future(location),
};
Timeout::new_with_delay(future, delay)
Timeout::new_with_delay(future, Instant::now().checked_add(duration))
}

/// Requires a `Future` to complete before the specified instant in time.
Expand Down Expand Up @@ -146,11 +139,10 @@ pub fn timeout_at<F>(deadline: Instant, future: F) -> Timeout<F>
where
F: Future,
{
let delay = sleep_until(deadline);

Timeout {
value: future,
delay,
deadline: Some(deadline),
delay: None,
}
}

Expand All @@ -161,14 +153,19 @@ pin_project! {
pub struct Timeout<T> {
#[pin]
value: T,
deadline : Option<Instant>,
#[pin]
delay: Sleep,
delay: Option<Sleep>,
}
}

impl<T> Timeout<T> {
pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout<T> {
Timeout { value, delay }
pub(crate) fn new_with_delay(value: T, deadline: Option<Instant>) -> Timeout<T> {
Timeout {
value,
deadline,
delay: None,
}
}

/// Gets a reference to the underlying value in this timeout.
Expand All @@ -194,7 +191,7 @@ where
type Output = Result<T::Output, Elapsed>;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let mut me = self.project();

let had_budget_before = coop::has_budget_remaining();

Expand All @@ -205,10 +202,23 @@ where

let has_budget_now = coop::has_budget_remaining();

// If the above inner future is ready, the below code will not be executed.
// This lazy initiation is for performance purposes,
// it can avoid unnecessary of `Sleep` creation and drop.
if me.delay.is_none() {
let location = trace::caller_location();
let delay = match me.deadline {
Some(deadline) => Sleep::new_timeout(*deadline, location),
None => Sleep::far_future(location),
};
me.delay.as_mut().set(Some(delay));
}

let delay = me.delay;

let poll_delay = || -> Poll<Self::Output> {
match delay.poll(cx) {
// Safety: we have just assigned it a value of `Some`.
match delay.as_pin_mut().unwrap().poll(cx) {
Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())),
Poll::Pending => Poll::Pending,
}
Expand Down

0 comments on commit f938c34

Please sign in to comment.