Skip to content

Commit

Permalink
rt: unhandled panic config for current thread rt
Browse files Browse the repository at this point in the history
Allows the user to configure the runtime's behavior when a spawned task
panics. Currently, the panic is propagated to the JoinHandle and the
runtime resumes. This patch lets the user set the runtime to shutdown on
unhandled panic.

So far, this is only implemented for the current-thread runtime.

Refs: #4516
  • Loading branch information
carllerche committed Feb 18, 2022
1 parent 43c224f commit 798c971
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 23 deletions.
88 changes: 71 additions & 17 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -57,13 +57,30 @@ struct Core {

/// Metrics batch
metrics: MetricsBatch,

/// True if a task panicked without being handled and the runtime is
/// configured to shutdown on unhandled panic.
unhandled_panic: bool,
}

#[derive(Clone)]
pub(crate) struct Spawner {
shared: Arc<Shared>,
}

/// Configuration settings passed in from the runtime builder.
pub(crate) struct Config {
/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,

/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,

#[cfg(tokio_unstable)]
/// How to respond to unhandled task panics.
pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
}

/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue. None if the `Runtime` has been dropped.
Expand All @@ -78,11 +95,8 @@ struct Shared {
/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,

/// Callback for a worker parking itself
before_park: Option<Callback>,

/// Callback for a worker unparking itself
after_unpark: Option<Callback>,
/// Scheduler configuration options
config: Config,

/// Keeps track of various runtime metrics.
scheduler_metrics: SchedulerMetrics,
Expand Down Expand Up @@ -117,11 +131,7 @@ const REMOTE_FIRST_INTERVAL: u8 = 31;
scoped_thread_local!(static CURRENT: Context);

impl BasicScheduler {
pub(crate) fn new(
driver: Driver,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
) -> BasicScheduler {
pub(crate) fn new(driver: Driver, config: Config) -> BasicScheduler {
let unpark = driver.unpark();

let spawner = Spawner {
Expand All @@ -130,8 +140,7 @@ impl BasicScheduler {
owned: OwnedTasks::new(),
unpark,
woken: AtomicBool::new(false),
before_park,
after_unpark,
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: WorkerMetrics::new(),
}),
Expand All @@ -143,6 +152,7 @@ impl BasicScheduler {
tick: 0,
driver: Some(driver),
metrics: MetricsBatch::new(),
unhandled_panic: false,
})));

BasicScheduler {
Expand All @@ -157,6 +167,7 @@ impl BasicScheduler {
&self.spawner
}

#[track_caller]
pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
pin!(future);

Expand Down Expand Up @@ -296,7 +307,7 @@ impl Context {
fn park(&self, mut core: Box<Core>) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");

if let Some(f) = &self.spawner.shared.before_park {
if let Some(f) = &self.spawner.shared.config.before_park {
// Incorrect lint, the closures are actually different types so `f`
// cannot be passed as an argument to `enter`.
#[allow(clippy::redundant_closure)]
Expand All @@ -319,7 +330,7 @@ impl Context {
core.metrics.returned_from_park();
}

if let Some(f) = &self.spawner.shared.after_unpark {
if let Some(f) = &self.spawner.shared.config.after_unpark {
// Incorrect lint, the closures are actually different types so `f`
// cannot be passed as an argument to `enter`.
#[allow(clippy::redundant_closure)]
Expand Down Expand Up @@ -460,6 +471,35 @@ impl Schedule for Arc<Shared> {
}
});
}

cfg_unstable! {
fn unhandled_panic(&self) {
use crate::runtime::UnhandledPanic;

match self.config.unhandled_panic {
UnhandledPanic::Ignore => {
// Do nothing
}
UnhandledPanic::ShutdownRuntime => {
// This hook is only called from within the runtime, so
// `CURRENT` should match with `&self`, i.e. there is no
// opportunity for a nested scheduler to be called.
CURRENT.with(|maybe_cx| match maybe_cx {
Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => {
let mut core = cx.core.borrow_mut();

// If `None`, the runtime is shutting down, so there is no need to signal shutdown
if let Some(core) = core.as_mut() {
core.unhandled_panic = true;
self.owned.close_and_shutdown_all();
}
}
_ => panic!("runtime core not set in CURRENT thread-local"),
})
}
}
}
}
}

impl Wake for Shared {
Expand All @@ -484,8 +524,9 @@ struct CoreGuard<'a> {
}

impl CoreGuard<'_> {
#[track_caller]
fn block_on<F: Future>(self, future: F) -> F::Output {
self.enter(|mut core, context| {
let ret = self.enter(|mut core, context| {
let _enter = crate::runtime::enter(false);
let waker = context.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
Expand All @@ -501,11 +542,16 @@ impl CoreGuard<'_> {
core = c;

if let Ready(v) = res {
return (core, v);
return (core, Some(v));
}
}

for _ in 0..MAX_TASKS_PER_TICK {
// Make sure we didn't hit an unhandled_panic
if core.unhandled_panic {
return (core, None);
}

// Get and increment the current tick
let tick = core.tick;
core.tick = core.tick.wrapping_add(1);
Expand Down Expand Up @@ -539,7 +585,15 @@ impl CoreGuard<'_> {
// pending I/O events.
core = context.park_yield(core);
}
})
});

match ret {
Some(ret) => ret,
None => {
// `block_on` panicked.
panic!("a spawned task panicked and the runtime is configured to shutdown on unhandled panic");
}
}
}

/// Enters the scheduler context. This sets the queue and other necessary
Expand Down
33 changes: 31 additions & 2 deletions tokio/src/runtime/builder.rs
Expand Up @@ -78,6 +78,17 @@ pub struct Builder {

/// Customizable keep alive timeout for BlockingPool
pub(super) keep_alive: Option<Duration>,

#[cfg(tokio_unstable)]
pub(super) unhandled_panic: UnhandledPanic,
}

cfg_unstable! {
#[derive(Debug, Clone)]
pub enum UnhandledPanic {
Ignore,
ShutdownRuntime,
}
}

pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
Expand Down Expand Up @@ -145,6 +156,9 @@ impl Builder {
after_unpark: None,

keep_alive: None,

#[cfg(tokio_unstable)]
unhandled_panic: UnhandledPanic::Ignore,
}
}

Expand Down Expand Up @@ -554,7 +568,15 @@ impl Builder {
self
}

cfg_unstable! {
pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
self.unhandled_panic = behavior;
self
}
}

fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::basic_scheduler::Config;
use crate::runtime::{BasicScheduler, Kind};

let (driver, resources) = driver::Driver::new(self.get_cfg())?;
Expand All @@ -563,8 +585,15 @@ impl Builder {
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler =
BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone());
let scheduler = BasicScheduler::new(
driver,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
},
);
let spawner = Spawner::Basic(scheduler.spawner().clone());

// Blocking pool
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -212,6 +212,9 @@ cfg_rt! {

mod builder;
pub use self::builder::Builder;
cfg_unstable! {
pub use self::builder::UnhandledPanic;
}

pub(crate) mod context;
pub(crate) mod driver;
Expand Down
19 changes: 15 additions & 4 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -100,7 +100,7 @@ where
let header_ptr = self.header_ptr();
let waker_ref = waker_ref::<T, S>(&header_ptr);
let cx = Context::from_waker(&*waker_ref);
let res = poll_future(&self.core().stage, cx);
let res = poll_future(&self.core().stage, &self.core().scheduler, cx);

if res == Poll::Ready(()) {
// The future completed. Move on to complete the task.
Expand Down Expand Up @@ -450,7 +450,11 @@ fn cancel_task<T: Future>(stage: &CoreStage<T>) {

/// Polls the future. If the future completes, the output is written to the
/// stage field.
fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
fn poll_future<T: Future, S: Schedule>(
core: &CoreStage<T>,
scheduler: &S,
cx: Context<'_>,
) -> Poll<()> {
// Poll the future.
let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
struct Guard<'a, T: Future> {
Expand All @@ -473,13 +477,20 @@ fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
let output = match output {
Ok(Poll::Pending) => return Poll::Pending,
Ok(Poll::Ready(output)) => Ok(output),
Err(panic) => Err(JoinError::panic(panic)),
Err(panic) => {
scheduler.unhandled_panic();
Err(JoinError::panic(panic))
}
};

// Catch and ignore panics if the future panics on drop.
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
core.store_output(output);
}));

if res.is_err() {
scheduler.unhandled_panic();
}

Poll::Ready(())
}
5 changes: 5 additions & 0 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -234,6 +234,11 @@ pub(crate) trait Schedule: Sync + Sized + 'static {
fn yield_now(&self, task: Notified<Self>) {
self.schedule(task);
}

/// Polling the task resulted in a panic. Should the runtime shutdown?
fn unhandled_panic(&self) {
// By default, do nothing. This maintains the 1.0 behavior.
}
}

cfg_rt! {
Expand Down

0 comments on commit 798c971

Please sign in to comment.