Skip to content

Commit

Permalink
spawn_blocking: Re-enable auto-advance if the task panics.
Browse files Browse the repository at this point in the history
This uses a destructor, so it will also work if tokio machinery panics while
trying to e.g. spawn a thread.
  • Loading branch information
jorendorff committed Oct 26, 2022
1 parent 13b829f commit ea25baf
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 30 deletions.
6 changes: 3 additions & 3 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,10 @@ impl Spawner {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;

let (task, handle) = task::unowned(fut, NoopSchedule, id);

#[cfg(feature = "test-util")]
crate::time::inhibit_auto_advance();
let fut = crate::time::inhibit_auto_advance(fut);

let (task, handle) = task::unowned(fut, NoopSchedule, id);

let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
(handle, spawned)
Expand Down
7 changes: 1 addition & 6 deletions tokio/src/runtime/blocking/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ where
// we want it to start without any budgeting.
crate::coop::stop();

let r = func();

#[cfg(feature = "test-util")]
crate::time::allow_auto_advance();

Poll::Ready(r)
Poll::Ready(func())
}
}
42 changes: 23 additions & 19 deletions tokio/src/time/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ cfg_not_test_util! {
}

cfg_test_util! {
use std::future::Future;

use crate::time::{Duration, Instant};
use crate::loom::sync::{Arc, Mutex};

Expand Down Expand Up @@ -126,28 +128,30 @@ cfg_test_util! {
inner.unfrozen = Some(std::time::Instant::now());
}

/// Stop auto-advancing the clock (see `tokio::time::pause`) until
/// `allow_auto_advance` is called.
///
/// # Panics
///
/// Panics if called from outsie of a `current_thread` Tokio runtime.
#[track_caller]
pub(crate) fn inhibit_auto_advance() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.inhibit_auto_advance();
struct AutoAdvanceInhibit(Clock);

impl Drop for AutoAdvanceInhibit {
fn drop(&mut self) {
self.0.allow_auto_advance();
}
}

/// Resume auto-advance. This should only be called to balance out a previous
/// call to `inhibit_auto_advance`.
///
/// # Panics
/// Temporarily stop auto-advancing the clock (see `tokio::time::pause`)
/// and decorate the given future with code to re-enable auto-advance when
/// it returns `Ready` or is dropped.
///
/// Panics if called from outsie of a `current_thread` Tokio runtime.
#[track_caller]
pub(crate) fn allow_auto_advance() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.allow_auto_advance();
/// This is a no-op when called from outside the Tokio runtime.
pub(crate) fn inhibit_auto_advance<F: Future>(fut: F) -> impl Future<Output = F::Output> {
// Bump the inhibit count immediately, not inside the async block, to
// avoid a race condition when used by spawn_blocking.
let guard = clock().map(|clock| {
clock.inhibit_auto_advance();
AutoAdvanceInhibit(clock)
});
async move {
let _guard = guard;
fut.await
}
}

/// Advances time.
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@
mod clock;
pub(crate) use self::clock::Clock;
#[cfg(feature = "test-util")]
pub use clock::{advance, pause, resume};
pub(crate) use clock::inhibit_auto_advance;
#[cfg(feature = "test-util")]
pub(crate) use clock::{allow_auto_advance, inhibit_auto_advance};
pub use clock::{advance, pause, resume};

pub mod error;

Expand Down
20 changes: 20 additions & 0 deletions tokio/tests/task_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,23 @@ async fn blocking_task_wakes_paused_runtime() {
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn panicking_blocking_task_wakes_paused_runtime() {
let t0 = std::time::Instant::now();
let result = time::timeout(
Duration::from_secs(15),
task::spawn_blocking(|| {
thread::sleep(Duration::from_millis(250));
panic!("blocking task panicked");
}),
)
.await
.expect("timeout should not trigger");
assert!(result.is_err(), "blocking task should have panicked");
assert!(
t0.elapsed() < Duration::from_secs(10),
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}

0 comments on commit ea25baf

Please sign in to comment.