Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unstable on_thread_park_id() to runtime Builder (for stuck task watchdog) #6370

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
96 changes: 91 additions & 5 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::runtime::handle::Handle;
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
use crate::runtime::{blocking, driver, Callback, CallbackWorker, HistogramBuilder, Runtime};
use crate::util::rand::{RngSeed, RngSeedGenerator};

use std::fmt;
Expand Down Expand Up @@ -73,10 +73,10 @@ pub struct Builder {
pub(super) before_stop: Option<Callback>,

/// To run before each worker thread is parked.
pub(super) before_park: Option<Callback>,
pub(super) before_park: Option<CallbackWorker>,

/// To run after each thread is unparked.
pub(super) after_unpark: Option<Callback>,
pub(super) after_unpark: Option<CallbackWorker>,

/// Customizable keep alive timeout for `BlockingPool`
pub(super) keep_alive: Option<Duration>,
Expand Down Expand Up @@ -640,7 +640,7 @@ impl Builder {
where
F: Fn() + Send + Sync + 'static,
{
self.before_park = Some(std::sync::Arc::new(f));
self.before_park = Some(std::sync::Arc::new(move |_id| f()));
self
}

Expand Down Expand Up @@ -675,7 +675,7 @@ impl Builder {
where
F: Fn() + Send + Sync + 'static,
{
self.after_unpark = Some(std::sync::Arc::new(f));
self.after_unpark = Some(std::sync::Arc::new(move |_id| f()));
self
}

Expand Down Expand Up @@ -938,6 +938,92 @@ impl Builder {
self.seed_generator = RngSeedGenerator::new(seed);
self
}

/// Has the same behavior as `on_thread_park` except the id of the thread that is parked
/// is passed to the callback function `f`. The id corresponds to the same `usize` that
/// is used in calls to `RuntimeMetrics`.
///
/// Note: if both `on_thread_park` and `on_thread_park_id` are called, only the last one
/// will be saved.
///
/// # Examples
///
/// ## Stuck task detector
///
/// ```
/// # use std::sync::atomic::{AtomicBool, Ordering};
/// # use std::{thread, time};
///
/// fn main() {
/// const WORKERS: usize = 4;
/// const UNPARKED: AtomicBool = AtomicBool::new(false);
/// static IS_PARKED: [AtomicBool; WORKERS] = [UNPARKED; WORKERS];
///
/// let runtime = tokio::runtime::Builder::new_multi_thread()
/// .worker_threads(WORKERS)
/// .on_thread_park_id(|id| IS_PARKED[id].store(true, Ordering::Release))
/// .on_thread_unpark_id(|id| IS_PARKED[id].store(false, Ordering::Release))
/// .build()
/// .unwrap();
///
/// let metrics = runtime.handle().metrics();
/// let (done_tx, done_rx) = tokio::sync::oneshot::channel();
/// thread::spawn(move || {
/// let mut stuck_since = [time::Instant::now(); WORKERS];
/// let mut prev_poll_counts = [None; WORKERS];
/// loop {
/// thread::sleep(time::Duration::from_millis(250));
/// let now = time::Instant::now();
/// for ii in 0..WORKERS {
/// if IS_PARKED[ii].load(Ordering::Acquire) {
/// prev_poll_counts[ii] = None;
/// } else {
/// let poll_count = metrics.worker_poll_count(ii);
/// if Some(poll_count) == prev_poll_counts[ii] {
/// let duration = now.duration_since(stuck_since[ii]);
/// println!("*** worker {} is stuck for {:?} ***", ii, duration);
/// if duration > time::Duration::from_secs(1) {
/// let _ = done_tx.send(());
/// return;
/// }
/// } else {
/// prev_poll_counts[ii] = Some(poll_count);
/// stuck_since[ii] = now;
/// }
/// }
/// }
/// }
/// });
///
/// // Spawn a "stuck" task that doesn't yield properly (should be detected).
/// runtime.spawn(async { thread::sleep(time::Duration::from_secs(3)) });
/// runtime.block_on(async {
/// let _ = done_rx.await;
/// });
/// }
/// ```
#[cfg(not(loom))]
pub fn on_thread_park_id<F>(&mut self, f: F) -> &mut Self
where
F: Fn(usize) + Send + Sync + 'static,
{
self.before_park = Some(std::sync::Arc::new(f));
self
}

/// Same behavior as `on_thread_unpark` except the id of the thread that is parked is passed
/// to the callback function `f`. The id corresponds to the same `usize` that is used in
/// calls to `RuntimeMetrics`.
///
/// See `on_thread_park_id` for example stuck thread detector.
#[cfg(not(loom))]
pub fn on_thread_unpark_id<F>(&mut self, f: F) -> &mut Self
where
F: Fn(usize) + Send + Sync + 'static,
{
self.after_unpark = Some(std::sync::Arc::new(f));
self
}
}

cfg_metrics! {
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
allow(dead_code)
)]
use crate::runtime::Callback;
use crate::runtime::CallbackWorker;
use crate::util::RngSeedGenerator;

pub(crate) struct Config {
Expand All @@ -16,10 +16,10 @@ pub(crate) struct Config {
pub(crate) local_queue_capacity: usize,

/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,
pub(crate) before_park: Option<CallbackWorker>,

/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,
pub(crate) after_unpark: Option<CallbackWorker>,

/// The multi-threaded scheduler includes a per-worker LIFO slot used to
/// store the last scheduled task. This can improve certain usage patterns,
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,7 @@ cfg_rt! {

/// After thread starts / before thread stops
type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;

/// Before thread parks / after thread unparks
type CallbackWorker = std::sync::Arc<dyn Fn(usize) + Send + Sync>;
}
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ impl Context {
let mut driver = core.driver.take().expect("driver missing");

if let Some(f) = &handle.shared.config.before_park {
let (c, ()) = self.enter(core, || f());
let (c, ()) = self.enter(core, || f(0));
core = c;
}

Expand All @@ -371,7 +371,7 @@ impl Context {
}

if let Some(f) = &handle.shared.config.after_unpark {
let (c, ()) = self.enter(core, || f());
let (c, ()) = self.enter(core, || f(0));
core = c;
}

Expand Down
14 changes: 9 additions & 5 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ pub(super) struct Worker {

/// Core data
struct Core {
/// Index of this core
index: usize,

/// Used to schedule bookkeeping tasks every so often.
tick: u32,

Expand Down Expand Up @@ -252,7 +255,7 @@ pub(super) fn create(
let mut worker_metrics = Vec::with_capacity(size);

// Create the local queues
for _ in 0..size {
for i in 0..size {
let (steal, run_queue) = queue::local();

let park = park.clone();
Expand All @@ -261,6 +264,7 @@ pub(super) fn create(
let stats = Stats::new(&metrics);

cores.push(Box::new(Core {
index: i,
tick: 0,
lifo_slot: None,
lifo_enabled: !config.disable_lifo_slot,
Expand Down Expand Up @@ -306,10 +310,10 @@ pub(super) fn create(

let mut launch = Launch(vec![]);

for (index, core) in cores.drain(..).enumerate() {
for core in cores.drain(..) {
launch.0.push(Arc::new(Worker {
handle: handle.clone(),
index,
index: core.index,
core: AtomicCell::new(Some(core)),
}));
}
Expand Down Expand Up @@ -684,7 +688,7 @@ impl Context {
/// after all the IOs get dispatched
fn park(&self, mut core: Box<Core>) -> Box<Core> {
if let Some(f) = &self.worker.handle.shared.config.before_park {
f();
f(core.index);
}

if core.transition_to_parked(&self.worker) {
Expand All @@ -702,7 +706,7 @@ impl Context {
}

if let Some(f) = &self.worker.handle.shared.config.after_unpark {
f();
f(core.index);
}
core
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ impl Worker {

fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
if let Some(f) = &cx.shared().config.before_park {
f();
f(core.index);
}

if self.can_transition_to_parked(&mut core) {
Expand All @@ -1140,7 +1140,7 @@ impl Worker {
}

if let Some(f) = &cx.shared().config.after_unpark {
f();
f(core.index);
}

Ok((None, core))
Expand Down
86 changes: 85 additions & 1 deletion tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![cfg(all(feature = "full", tokio_unstable, not(target_os = "wasi")))]

use std::future::Future;
use std::sync::{Arc, Mutex};
use std::sync::{atomic, Arc, Mutex};
use std::task::Poll;
use tokio::macros::support::poll_fn;

Expand Down Expand Up @@ -680,6 +680,90 @@ fn budget_exhaustion_yield_with_joins() {
assert_eq!(1, rt.metrics().budget_forced_yield_count());
}

#[test]
fn on_thread_park_unpark_id() {
const THREADS: usize = 8;

// Keeps track whether or not each worker is parked
let mut bools = Vec::new();
for _ in 0..THREADS {
bools.push(atomic::AtomicBool::new(false));
}
let bools = Arc::new(bools);
let bools_park = bools.clone();
let bools_unpark = bools.clone();

let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(THREADS)
.enable_all()
.on_thread_park_id(move |worker| {
// worker is parked
bools_park[worker].store(true, atomic::Ordering::Release);
})
.on_thread_unpark_id(move |worker| {
bools_unpark[worker].store(false, atomic::Ordering::Release);
})
.build()
.unwrap();
let metrics = rt.metrics();

rt.block_on(async {
// Spawn some tasks to do things, but less than the number of workers. Some
// workers won't have any work to do and will stay parked the duration of the
// test. We rely on bools to distinguish between a busy (unparked) worker that
// isn't polling, vs. ones that are merely parked the entire time.
for _ in 0..(THREADS - 1) {
tokio::spawn(async {
loop {
tokio::time::sleep(Duration::from_millis(4)).await;
}
});
}

// Give the spawned tasks a chance to both poll and park. Not really necessary.
tokio::time::sleep(Duration::from_millis(30)).await;

let _ = tokio::spawn(async move {
let mut counts = Vec::new();
for ii in 0..THREADS {
counts.push(metrics.worker_poll_count(ii));
}

let start_time = std::time::Instant::now();
while start_time.elapsed() < Duration::from_millis(100) {
// Uncomment the line below and the test fails (current worker is no
// longer "stuck" and not yielding back to tokio).

// tokio::task::yield_now().await;
}

let mut stuck = 0;
for ii in 0..THREADS {
let parked = bools[ii].load(atomic::Ordering::Acquire);
// Uncomment below to verify that some workers are not doing any polls,
// yet only one of them is not parked.

// if !parked {
// println!("task {} is not parked", ii);
// }
// if metrics.worker_poll_count(ii) == counts[ii] {
// println!("task {} has same poll count", ii);
// }

if !parked && metrics.worker_poll_count(ii) == counts[ii] {
stuck += 1;
}
}

assert_eq!(
stuck, 1,
"should be exactly one non-polling, non-parked thread"
);
})
.await;
});
}

#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_fd_count() {
Expand Down