diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 88bdcfd6421..c42924be77d 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -17,8 +17,6 @@ cfg_trace! { mod schedule; mod shutdown; mod task; -#[cfg(all(test, not(tokio_wasm)))] -pub(crate) use schedule::NoopSchedule; pub(crate) use task::BlockingTask; use crate::runtime::Builder; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 9c536141996..e9f6b66e0fc 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -2,7 +2,7 @@ use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; -use crate::runtime::blocking::schedule::NoopSchedule; +use crate::runtime::blocking::schedule::BlockingSchedule; use crate::runtime::blocking::{shutdown, BlockingTask}; use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; @@ -120,7 +120,7 @@ struct Shared { } pub(crate) struct Task { - task: task::UnownedTask, + task: task::UnownedTask, mandatory: Mandatory, } @@ -151,7 +151,7 @@ impl From for io::Error { } impl Task { - pub(crate) fn new(task: task::UnownedTask, mandatory: Mandatory) -> Task { + pub(crate) fn new(task: task::UnownedTask, mandatory: Mandatory) -> Task { Task { task, mandatory } } @@ -379,7 +379,8 @@ impl Spawner { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - let (task, handle) = task::unowned(fut, NoopSchedule, id); + let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id); + let spawned = self.spawn_task(Task::new(task, is_mandatory), rt); (handle, spawned) } diff --git a/tokio/src/runtime/blocking/schedule.rs b/tokio/src/runtime/blocking/schedule.rs index 54252241d94..edf775be8be 100644 --- a/tokio/src/runtime/blocking/schedule.rs +++ b/tokio/src/runtime/blocking/schedule.rs @@ -1,15 +1,52 @@ +#[cfg(feature = "test-util")] +use crate::runtime::scheduler; use crate::runtime::task::{self, Task}; +use crate::runtime::Handle; -/// `task::Schedule` implementation that does nothing. This is unique to the -/// blocking scheduler as tasks scheduled are not really futures but blocking -/// operations. +/// `task::Schedule` implementation that does nothing (except some bookkeeping +/// in test-util builds). This is unique to the blocking scheduler as tasks +/// scheduled are not really futures but blocking operations. /// /// We avoid storing the task by forgetting it in `bind` and re-materializing it -/// in `release. -pub(crate) struct NoopSchedule; +/// in `release`. +pub(crate) struct BlockingSchedule { + #[cfg(feature = "test-util")] + handle: Handle, +} + +impl BlockingSchedule { + #[cfg_attr(not(feature = "test-util"), allow(unused_variables))] + pub(crate) fn new(handle: &Handle) -> Self { + #[cfg(feature = "test-util")] + { + match &handle.inner { + scheduler::Handle::CurrentThread(handle) => { + handle.driver.clock.inhibit_auto_advance(); + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(_) => {} + } + } + BlockingSchedule { + #[cfg(feature = "test-util")] + handle: handle.clone(), + } + } +} -impl task::Schedule for NoopSchedule { +impl task::Schedule for BlockingSchedule { fn release(&self, _task: &Task) -> Option> { + #[cfg(feature = "test-util")] + { + match &self.handle.inner { + scheduler::Handle::CurrentThread(handle) => { + handle.driver.clock.allow_auto_advance(); + handle.driver.unpark(); + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(_) => {} + } + } None } diff --git a/tokio/src/runtime/tests/loom_blocking.rs b/tokio/src/runtime/tests/loom_blocking.rs index 89de85e4362..5c4aeae39c5 100644 --- a/tokio/src/runtime/tests/loom_blocking.rs +++ b/tokio/src/runtime/tests/loom_blocking.rs @@ -73,6 +73,27 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread }); } +#[test] +fn spawn_blocking_when_paused() { + use std::time::Duration; + loom::model(|| { + let rt = crate::runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + let handle = rt.handle(); + let _enter = handle.enter(); + let a = crate::task::spawn_blocking(|| {}); + let b = crate::task::spawn_blocking(|| {}); + rt.block_on(crate::time::timeout(Duration::from_millis(1), async move { + a.await.expect("blocking task should finish"); + b.await.expect("blocking task should finish"); + })) + .expect("timeout should not trigger"); + }); +} + fn mk_runtime(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index 8d4e1d384e2..fc93bf3e4a2 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,6 +1,6 @@ -use crate::runtime::blocking::NoopSchedule; use crate::runtime::scheduler::multi_thread::queue; use crate::runtime::task::Inject; +use crate::runtime::tests::NoopSchedule; use crate::runtime::MetricsBatch; use loom::thread; diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 1c67dfefb32..4e7c2453f25 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -2,11 +2,29 @@ // other code when running loom tests. #![cfg_attr(loom, warn(dead_code, unreachable_pub))] +use self::noop_scheduler::NoopSchedule; use self::unowned_wrapper::unowned; +mod noop_scheduler { + use crate::runtime::task::{self, Task}; + + /// `task::Schedule` implementation that does nothing, for testing. + pub(crate) struct NoopSchedule; + + impl task::Schedule for NoopSchedule { + fn release(&self, _task: &Task) -> Option> { + None + } + + fn schedule(&self, _task: task::Notified) { + unreachable!(); + } + } +} + mod unowned_wrapper { - use crate::runtime::blocking::NoopSchedule; use crate::runtime::task::{Id, JoinHandle, Notified}; + use crate::runtime::tests::NoopSchedule; #[cfg(all(tokio_unstable, feature = "tracing"))] pub(crate) fn unowned(task: T) -> (Notified, JoinHandle) diff --git a/tokio/src/runtime/tests/task.rs b/tokio/src/runtime/tests/task.rs index 173e5b0b23f..a79c0f50d15 100644 --- a/tokio/src/runtime/tests/task.rs +++ b/tokio/src/runtime/tests/task.rs @@ -1,5 +1,5 @@ -use crate::runtime::blocking::NoopSchedule; use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task}; +use crate::runtime::tests::NoopSchedule; use crate::util::TryLock; use std::collections::VecDeque; diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 240f8f16e6d..f81cab8cc35 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -222,7 +222,7 @@ impl Driver { let handle = rt_handle.time(); let clock = &handle.time_source.clock; - if clock.is_paused() { + if clock.can_auto_advance() { self.park.park_timeout(rt_handle, Duration::from_secs(0)); // If the time driver was woken, then the park completed diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 0343c4f4cf0..cd11a67527f 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -65,6 +65,9 @@ cfg_test_util! { /// Instant at which the clock was last unfrozen. unfrozen: Option, + + /// Number of `inhibit_auto_advance` calls still in effect. + auto_advance_inhibit_count: usize, } /// Pauses time. @@ -187,6 +190,7 @@ cfg_test_util! { enable_pausing, base: now, unfrozen: Some(now), + auto_advance_inhibit_count: 0, })), }; @@ -212,9 +216,20 @@ cfg_test_util! { inner.unfrozen = None; } - pub(crate) fn is_paused(&self) -> bool { + /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`). + pub(crate) fn inhibit_auto_advance(&self) { + let mut inner = self.inner.lock(); + inner.auto_advance_inhibit_count += 1; + } + + pub(crate) fn allow_auto_advance(&self) { + let mut inner = self.inner.lock(); + inner.auto_advance_inhibit_count -= 1; + } + + pub(crate) fn can_auto_advance(&self) -> bool { let inner = self.inner.lock(); - inner.unfrozen.is_none() + inner.unfrozen.is_none() && inner.auto_advance_inhibit_count == 0 } #[track_caller] diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index e5879332d0e..2999758ff36 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads -use tokio::{runtime, task}; +use tokio::{runtime, task, time}; use tokio_test::assert_ok; use std::thread; @@ -226,3 +226,84 @@ fn coop_disabled_in_block_in_place_in_block_on() { done_rx.recv().unwrap().unwrap(); } + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn blocking_when_paused() { + // Do not auto-advance time when we have started a blocking task that has + // not yet finished. + time::timeout( + Duration::from_secs(3), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); + + // Really: Do not auto-advance time, even if the timeout is short and the + // blocking task runs for longer than that. It doesn't matter: Tokio time + // is paused; system time is not. + time::timeout( + Duration::from_millis(1), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); +} + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + time::timeout( + Duration::from_secs(15), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn unawaited_blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + + // When this task finishes, time should auto-advance, even though the + // JoinHandle has not been awaited yet. + let a = task::spawn_blocking(|| { + thread::sleep(Duration::from_millis(1)); + }); + + crate::time::sleep(Duration::from_secs(15)).await; + a.await.expect("blocking task should finish"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn panicking_blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + let result = time::timeout( + Duration::from_secs(15), + task::spawn_blocking(|| { + thread::sleep(Duration::from_millis(1)); + panic!("blocking task panicked"); + }), + ) + .await + .expect("timeout should not trigger"); + assert!(result.is_err(), "blocking task should have panicked"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +}