From c98be229ff3a4e1e230711844b7e38a30cb64f2c Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 16 Jun 2022 12:54:43 -0700 Subject: [PATCH] rt: unhandled panic config for current thread rt (#4770) Allows the user to configure the runtime's behavior when a spawned task panics. Currently, the panic is propagated to the JoinHandle and the runtime resumes. This patch lets the user set the runtime to shutdown on unhandled panic. So far, this is only implemented for the current-thread runtime. Refs: #4516 --- tokio/src/runtime/basic_scheduler.rs | 59 ++++++++++- tokio/src/runtime/builder.rs | 150 +++++++++++++++++++++++++++ tokio/src/runtime/mod.rs | 3 + tokio/src/runtime/task/harness.rs | 25 ++++- tokio/src/runtime/task/mod.rs | 5 + tokio/src/task/local.rs | 110 ++++++++++++++++++++ tokio/tests/rt_basic.rs | 90 ++++++++++++++++ tokio/tests/task_local_set.rs | 23 ++++ 8 files changed, 458 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index c9820b758d5..9792ef57bd5 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -57,6 +57,10 @@ struct Core { /// Metrics batch metrics: MetricsBatch, + + /// True if a task panicked without being handled and the runtime is + /// configured to shutdown on unhandled panic. + unhandled_panic: bool, } #[derive(Clone)] @@ -76,6 +80,10 @@ pub(crate) struct Config { /// Callback for a worker unparking itself pub(crate) after_unpark: Option, + + #[cfg(tokio_unstable)] + /// How to respond to unhandled task panics. + pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, } /// Scheduler state shared between threads. @@ -144,6 +152,7 @@ impl BasicScheduler { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(), + unhandled_panic: false, }))); BasicScheduler { @@ -158,6 +167,7 @@ impl BasicScheduler { &self.spawner } + #[track_caller] pub(crate) fn block_on(&self, future: F) -> F::Output { pin!(future); @@ -465,6 +475,35 @@ impl Schedule for Arc { } }); } + + cfg_unstable! { + fn unhandled_panic(&self) { + use crate::runtime::UnhandledPanic; + + match self.config.unhandled_panic { + UnhandledPanic::Ignore => { + // Do nothing + } + UnhandledPanic::ShutdownRuntime => { + // This hook is only called from within the runtime, so + // `CURRENT` should match with `&self`, i.e. there is no + // opportunity for a nested scheduler to be called. + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => { + let mut core = cx.core.borrow_mut(); + + // If `None`, the runtime is shutting down, so there is no need to signal shutdown + if let Some(core) = core.as_mut() { + core.unhandled_panic = true; + self.owned.close_and_shutdown_all(); + } + } + _ => unreachable!("runtime core not set in CURRENT thread-local"), + }) + } + } + } + } } impl Wake for Shared { @@ -489,8 +528,9 @@ struct CoreGuard<'a> { } impl CoreGuard<'_> { + #[track_caller] fn block_on(self, future: F) -> F::Output { - self.enter(|mut core, context| { + let ret = self.enter(|mut core, context| { let _enter = crate::runtime::enter(false); let waker = context.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); @@ -506,11 +546,16 @@ impl CoreGuard<'_> { core = c; if let Ready(v) = res { - return (core, v); + return (core, Some(v)); } } for _ in 0..core.spawner.shared.config.event_interval { + // Make sure we didn't hit an unhandled_panic + if core.unhandled_panic { + return (core, None); + } + // Get and increment the current tick let tick = core.tick; core.tick = core.tick.wrapping_add(1); @@ -544,7 +589,15 @@ impl CoreGuard<'_> { // pending I/O events. core = context.park_yield(core); } - }) + }); + + match ret { + Some(ret) => ret, + None => { + // `block_on` panicked. + panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic"); + } + } } /// Enters the scheduler context. This sets the queue and other necessary diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index f6d78f736a0..924aac98ef3 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -84,6 +84,90 @@ pub struct Builder { /// How many ticks before yielding to the driver for timer and I/O events? pub(super) event_interval: u32, + + #[cfg(tokio_unstable)] + pub(super) unhandled_panic: UnhandledPanic, +} + +cfg_unstable! { + /// How the runtime should respond to unhandled panics. + /// + /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic` + /// to configure the runtime behavior when a spawned task panics. + /// + /// See [`Builder::unhandled_panic`] for more details. + #[derive(Debug, Clone)] + #[non_exhaustive] + pub enum UnhandledPanic { + /// The runtime should ignore panics on spawned tasks. + /// + /// The panic is forwarded to the task's [`JoinHandle`] and all spawned + /// tasks continue running normally. + /// + /// This is the default behavior. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, UnhandledPanic}; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_current_thread() + /// .unhandled_panic(UnhandledPanic::Ignore) + /// .build() + /// .unwrap(); + /// + /// let task1 = rt.spawn(async { panic!("boom"); }); + /// let task2 = rt.spawn(async { + /// // This task completes normally + /// "done" + /// }); + /// + /// rt.block_on(async { + /// // The panic on the first task is forwarded to the `JoinHandle` + /// assert!(task1.await.is_err()); + /// + /// // The second task completes normally + /// assert!(task2.await.is_ok()); + /// }) + /// # } + /// ``` + /// + /// [`JoinHandle`]: struct@crate::task::JoinHandle + Ignore, + + /// The runtime should immediately shutdown if a spawned task panics. + /// + /// The runtime will immediately shutdown even if the panicked task's + /// [`JoinHandle`] is still available. All further spawned tasks will be + /// immediately dropped and call to [`Runtime::block_on`] will panic. + /// + /// # Examples + /// + /// ```should_panic + /// use tokio::runtime::{self, UnhandledPanic}; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_current_thread() + /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) + /// .build() + /// .unwrap(); + /// + /// rt.spawn(async { panic!("boom"); }); + /// rt.spawn(async { + /// // This task never completes. + /// }); + /// + /// rt.block_on(async { + /// // Do some work + /// # loop { tokio::task::yield_now().await; } + /// }) + /// # } + /// ``` + /// + /// [`JoinHandle`]: struct@crate::task::JoinHandle + ShutdownRuntime, + } } pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; @@ -163,6 +247,9 @@ impl Builder { // as parameters. global_queue_interval, event_interval, + + #[cfg(tokio_unstable)] + unhandled_panic: UnhandledPanic::Ignore, } } @@ -631,6 +718,67 @@ impl Builder { self } + cfg_unstable! { + /// Configure how the runtime responds to an unhandled panic on a + /// spawned task. + /// + /// By default, an unhandled panic (i.e. a panic not caught by + /// [`std::panic::catch_unwind`]) has no impact on the runtime's + /// execution. The panic is error value is forwarded to the task's + /// [`JoinHandle`] and all other spawned tasks continue running. + /// + /// The `unhandled_panic` option enables configuring this behavior. + /// + /// * `UnhandledPanic::Ignore` is the default behavior. Panics on + /// spawned tasks have no impact on the runtime's execution. + /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to + /// shutdown immediately when a spawned task panics even if that + /// task's `JoinHandle` has not been dropped. All other spawned tasks + /// will immediatetly terminate and further calls to + /// [`Runtime::block_on`] will panic. + /// + /// # Unstable + /// + /// This option is currently unstable and its implementation is + /// incomplete. The API may change or be removed in the future. See + /// tokio-rs/tokio#4516 for more details. + /// + /// # Examples + /// + /// The following demonstrates a runtime configured to shutdown on + /// panic. The first spawned task panics and results in the runtime + /// shutting down. The second spawned task never has a chance to + /// execute. The call to `block_on` will panic due to the runtime being + /// forcibly shutdown. + /// + /// ```should_panic + /// use tokio::runtime::{self, UnhandledPanic}; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_current_thread() + /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) + /// .build() + /// .unwrap(); + /// + /// rt.spawn(async { panic!("boom"); }); + /// rt.spawn(async { + /// // This task never completes. + /// }); + /// + /// rt.block_on(async { + /// // Do some work + /// # loop { tokio::task::yield_now().await; } + /// }) + /// # } + /// ``` + /// + /// [`JoinHandle`]: struct@crate::task::JoinHandle + pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self { + self.unhandled_panic = behavior; + self + } + } + fn build_basic_runtime(&mut self) -> io::Result { use crate::runtime::basic_scheduler::Config; use crate::runtime::{BasicScheduler, HandleInner, Kind}; @@ -661,6 +809,8 @@ impl Builder { after_unpark: self.after_unpark.clone(), global_queue_interval: self.global_queue_interval, event_interval: self.event_interval, + #[cfg(tokio_unstable)] + unhandled_panic: self.unhandled_panic.clone(), }, ); let spawner = Spawner::Basic(scheduler.spawner().clone()); diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index a030ccdf0d2..399e602d2bf 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -216,6 +216,9 @@ cfg_rt! { mod builder; pub use self::builder::Builder; + cfg_unstable! { + pub use self::builder::UnhandledPanic; + } pub(crate) mod context; mod driver; diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 1d3ababfb17..206cdf2695a 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -101,7 +101,12 @@ where let waker_ref = waker_ref::(&header_ptr); let cx = Context::from_waker(&*waker_ref); let core = self.core(); - let res = poll_future(&core.stage, core.task_id.clone(), cx); + let res = poll_future( + &core.stage, + &self.core().scheduler, + core.task_id.clone(), + cx, + ); if res == Poll::Ready(()) { // The future completed. Move on to complete the task. @@ -453,7 +458,12 @@ fn cancel_task(stage: &CoreStage, id: super::Id) { /// Polls the future. If the future completes, the output is written to the /// stage field. -fn poll_future(core: &CoreStage, id: super::Id, cx: Context<'_>) -> Poll<()> { +fn poll_future( + core: &CoreStage, + scheduler: &S, + id: super::Id, + cx: Context<'_>, +) -> Poll<()> { // Poll the future. let output = panic::catch_unwind(panic::AssertUnwindSafe(|| { struct Guard<'a, T: Future> { @@ -476,13 +486,20 @@ fn poll_future(core: &CoreStage, id: super::Id, cx: Context<'_>) - let output = match output { Ok(Poll::Pending) => return Poll::Pending, Ok(Poll::Ready(output)) => Ok(output), - Err(panic) => Err(JoinError::panic(id, panic)), + Err(panic) => { + scheduler.unhandled_panic(); + Err(JoinError::panic(id, panic)) + } }; // Catch and ignore panics if the future panics on drop. - let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { core.store_output(output); })); + if res.is_err() { + scheduler.unhandled_panic(); + } + Poll::Ready(()) } diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 316b4a49675..e73b3f35a54 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -262,6 +262,11 @@ pub(crate) trait Schedule: Sync + Sized + 'static { fn yield_now(&self, task: Notified) { self.schedule(task); } + + /// Polling the task resulted in a panic. Should the runtime shutdown? + fn unhandled_panic(&self) { + // By default, do nothing. This maintains the 1.0 behavior. + } } cfg_rt! { diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 8c59876d603..e3ba50a79ff 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -232,6 +232,10 @@ struct Context { /// State shared between threads. shared: Arc, + + /// True if a task panicked without being handled and the local set is + /// configured to shutdown on unhandled panic. + unhandled_panic: Cell, } /// LocalSet state shared between threads. @@ -241,6 +245,10 @@ struct Shared { /// Wake the `LocalSet` task. waker: AtomicWaker, + + /// How to respond to unhandled task panics. + #[cfg(tokio_unstable)] + pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, } pin_project! { @@ -330,7 +338,10 @@ impl LocalSet { shared: Arc::new(Shared { queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), waker: AtomicWaker::new(), + #[cfg(tokio_unstable)] + unhandled_panic: crate::runtime::UnhandledPanic::Ignore, }), + unhandled_panic: Cell::new(false), }, _not_send: PhantomData, } @@ -516,6 +527,11 @@ impl LocalSet { /// notified again. fn tick(&self) -> bool { for _ in 0..MAX_TASKS_PER_TICK { + // Make sure we didn't hit an unhandled panic + if self.context.unhandled_panic.get() { + panic!("a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic"); + } + match self.next_task() { // Run the task // @@ -567,6 +583,76 @@ impl LocalSet { } } +cfg_unstable! { + impl LocalSet { + /// Configure how the `LocalSet` responds to an unhandled panic on a + /// spawned task. + /// + /// By default, an unhandled panic (i.e. a panic not caught by + /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s + /// execution. The panic is error value is forwarded to the task's + /// [`JoinHandle`] and all other spawned tasks continue running. + /// + /// The `unhandled_panic` option enables configuring this behavior. + /// + /// * `UnhandledPanic::Ignore` is the default behavior. Panics on + /// spawned tasks have no impact on the `LocalSet`'s execution. + /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to + /// shutdown immediately when a spawned task panics even if that + /// task's `JoinHandle` has not been dropped. All other spawned tasks + /// will immediatetly terminate and further calls to + /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic. + /// + /// # Panics + /// + /// This method panics if called after the `LocalSet` has started + /// running. + /// + /// # Unstable + /// + /// This option is currently unstable and its implementation is + /// incomplete. The API may change or be removed in the future. See + /// tokio-rs/tokio#4516 for more details. + /// + /// # Examples + /// + /// The following demonstrates a `LocalSet` configured to shutdown on + /// panic. The first spawned task panics and results in the `LocalSet` + /// shutting down. The second spawned task never has a chance to + /// execute. The call to `run_until` will panic due to the runtime being + /// forcibly shutdown. + /// + /// ```should_panic + /// use tokio::runtime::UnhandledPanic; + /// + /// # #[tokio::main] + /// # async fn main() { + /// tokio::task::LocalSet::new() + /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) + /// .run_until(async { + /// tokio::task::spawn_local(async { panic!("boom"); }); + /// tokio::task::spawn_local(async { + /// // This task never completes + /// }); + /// + /// // Do some work, but `run_until` will panic before it completes + /// # loop { tokio::task::yield_now().await; } + /// }) + /// .await; + /// # } + /// ``` + /// + /// [`JoinHandle`]: struct@crate::task::JoinHandle + pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self { + // TODO: This should be set as a builder + Arc::get_mut(&mut self.context.shared) + .expect("TODO: we shouldn't panic") + .unhandled_panic = behavior; + self + } + } +} + impl fmt::Debug for LocalSet { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("LocalSet").finish() @@ -722,4 +808,28 @@ impl task::Schedule for Arc { fn schedule(&self, task: task::Notified) { Shared::schedule(self, task); } + + cfg_unstable! { + fn unhandled_panic(&self) { + use crate::runtime::UnhandledPanic; + + match self.unhandled_panic { + UnhandledPanic::Ignore => { + // Do nothing + } + UnhandledPanic::ShutdownRuntime => { + // This hook is only called from within the runtime, so + // `CURRENT` should match with `&self`, i.e. there is no + // opportunity for a nested scheduler to be called. + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if Arc::ptr_eq(self, &cx.shared) => { + cx.unhandled_panic.set(true); + cx.owned.close_and_shutdown_all(); + } + _ => unreachable!("runtime core not set in CURRENT thread-local"), + }) + } + } + } + } } diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index cc6ac677280..50b7e9732a5 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -288,6 +288,96 @@ fn timeout_panics_when_no_time_handle() { }); } +#[cfg(tokio_unstable)] +mod unstable { + use tokio::runtime::{Builder, UnhandledPanic}; + + #[test] + #[should_panic( + expected = "a spawned task panicked and the runtime is configured to shut down on unhandled panic" + )] + fn shutdown_on_panic() { + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + rt.block_on(async { + tokio::spawn(async { + panic!("boom"); + }); + + futures::future::pending::<()>().await; + }) + } + + #[test] + fn spawns_do_nothing() { + use std::sync::Arc; + + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + let rt1 = Arc::new(rt); + let rt2 = rt1.clone(); + + let _ = std::thread::spawn(move || { + rt2.block_on(async { + tokio::spawn(async { + panic!("boom"); + }); + + futures::future::pending::<()>().await; + }) + }) + .join(); + + let task = rt1.spawn(async {}); + let res = futures::executor::block_on(task); + assert!(res.is_err()); + } + + #[test] + fn shutdown_all_concurrent_block_on() { + const N: usize = 2; + use std::sync::{mpsc, Arc}; + + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + let rt = Arc::new(rt); + let mut ths = vec![]; + let (tx, rx) = mpsc::channel(); + + for _ in 0..N { + let rt = rt.clone(); + let tx = tx.clone(); + ths.push(std::thread::spawn(move || { + rt.block_on(async { + tx.send(()).unwrap(); + futures::future::pending::<()>().await; + }); + })); + } + + for _ in 0..N { + rx.recv().unwrap(); + } + + rt.spawn(async { + panic!("boom"); + }); + + for th in ths { + assert!(th.join().is_err()); + } + } +} + fn rt() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all() diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs index f8a35d0ede8..2bd5b17b84d 100644 --- a/tokio/tests/task_local_set.rs +++ b/tokio/tests/task_local_set.rs @@ -517,6 +517,29 @@ async fn spawn_wakes_localset() { } } +#[cfg(tokio_unstable)] +mod unstable { + use tokio::runtime::UnhandledPanic; + use tokio::task::LocalSet; + + #[tokio::test] + #[should_panic( + expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic" + )] + async fn shutdown_on_panic() { + LocalSet::new() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .run_until(async { + tokio::task::spawn_local(async { + panic!("boom"); + }); + + futures::future::pending::<()>().await; + }) + .await; + } +} + fn rt() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all()