From 7eaf5adc118556e9571edc9c9a3355695731b9da Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Thu, 3 Nov 2022 02:43:33 -0300 Subject: [PATCH 1/2] Export metrics about the blocking thread pool --- tokio/src/runtime/blocking/pool.rs | 69 +++++++++++++++++-- tokio/src/runtime/metrics/runtime.rs | 50 ++++++++++++++ tokio/src/runtime/scheduler/current_thread.rs | 8 +++ tokio/src/runtime/scheduler/mod.rs | 16 +++++ .../runtime/scheduler/multi_thread/handle.rs | 8 +++ tokio/tests/rt_metrics.rs | 40 +++++++++++ 6 files changed, 184 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index dd166109469..f3f206ce1f2 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -11,6 +11,7 @@ use crate::runtime::{Builder, Callback, Handle}; use std::collections::{HashMap, VecDeque}; use std::fmt; use std::io; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; pub(crate) struct BlockingPool { @@ -23,6 +24,40 @@ pub(crate) struct Spawner { inner: Arc, } +#[derive(Default)] +pub(crate) struct SpawnerMetrics { + num_threads: AtomicUsize, + queue_depth: AtomicUsize, +} + +impl SpawnerMetrics { + fn num_threads(&self) -> usize { + self.num_threads.load(Ordering::Relaxed) + } + + cfg_metrics! { + fn queue_depth(&self) -> usize { + self.queue_depth.load(Ordering::Relaxed) + } + } + + fn inc_num_threads(&self) { + self.num_threads.fetch_add(1, Ordering::Relaxed); + } + + fn dec_num_threads(&self) { + self.num_threads.fetch_sub(1, Ordering::Relaxed); + } + + fn inc_queue_depth(&self) { + self.queue_depth.fetch_add(1, Ordering::Relaxed); + } + + fn dec_queue_depth(&self) { + self.queue_depth.fetch_sub(1, Ordering::Relaxed); + } +} + struct Inner { /// State shared between worker threads. shared: Mutex, @@ -47,11 +82,13 @@ struct Inner { // Customizable wait timeout. keep_alive: Duration, + + // Metrics about the pool. + metrics: SpawnerMetrics, } struct Shared { queue: VecDeque, - num_th: usize, num_idle: u32, num_notify: u32, shutdown: bool, @@ -165,7 +202,6 @@ impl BlockingPool { inner: Arc::new(Inner { shared: Mutex::new(Shared { queue: VecDeque::new(), - num_th: 0, num_idle: 0, num_notify: 0, shutdown: false, @@ -181,6 +217,7 @@ impl BlockingPool { before_stop: builder.before_stop.clone(), thread_cap, keep_alive, + metrics: Default::default(), }), }, shutdown_rx, @@ -350,11 +387,12 @@ impl Spawner { } shared.queue.push_back(task); + self.inner.metrics.inc_queue_depth(); if shared.num_idle == 0 { // No threads are able to process the task. - if shared.num_th == self.inner.thread_cap { + if self.inner.metrics.num_threads() == self.inner.thread_cap { // At max number of threads } else { assert!(shared.shutdown_tx.is_some()); @@ -365,11 +403,14 @@ impl Spawner { match self.spawn_thread(shutdown_tx, rt, id) { Ok(handle) => { - shared.num_th += 1; + self.inner.metrics.inc_num_threads(); shared.worker_thread_index += 1; shared.worker_threads.insert(id, handle); } - Err(ref e) if is_temporary_os_thread_error(e) && shared.num_th > 0 => { + Err(ref e) + if is_temporary_os_thread_error(e) + && self.inner.metrics.num_threads() > 0 => + { // OS temporarily failed to spawn a new thread. // The task will be picked up eventually by a currently // busy thread. @@ -419,6 +460,18 @@ impl Spawner { } } +cfg_metrics! { + impl Spawner { + pub(crate) fn num_threads(&self) -> usize { + self.inner.metrics.num_threads() + } + + pub(crate) fn queue_depth(&self) -> usize { + self.inner.metrics.queue_depth() + } + } +} + // Tells whether the error when spawning a thread is temporary. #[inline] fn is_temporary_os_thread_error(error: &std::io::Error) -> bool { @@ -437,6 +490,7 @@ impl Inner { 'main: loop { // BUSY while let Some(task) = shared.queue.pop_front() { + self.metrics.dec_queue_depth(); drop(shared); task.run(); @@ -478,6 +532,7 @@ impl Inner { if shared.shutdown { // Drain the queue while let Some(task) = shared.queue.pop_front() { + self.metrics.dec_queue_depth(); drop(shared); task.shutdown_or_run_if_mandatory(); @@ -496,7 +551,7 @@ impl Inner { } // Thread exit - shared.num_th -= 1; + self.metrics.dec_num_threads(); // num_idle should now be tracked exactly, panic // with a descriptive message if it is not the @@ -506,7 +561,7 @@ impl Inner { .checked_sub(1) .expect("num_idle underflowed on thread exit"); - if shared.shutdown && shared.num_th == 0 { + if shared.shutdown && self.metrics.num_threads() == 0 { self.condvar.notify_one(); } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 49c926302f5..cda162bb0c0 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -42,6 +42,32 @@ impl RuntimeMetrics { self.handle.inner.num_workers() } + /// Returns the number of additional threads spawned by the runtime. + /// + /// The number of workers is set by configuring `max_blocking_threads` on + /// `runtime::Builder`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_blocking_threads(); + /// println!("Runtime has created {} threads", n); + /// } + /// ``` + pub fn num_blocking_threads(&self) -> usize { + self.handle.inner.num_blocking_threads() + } + /// Returns the number of tasks scheduled from **outside** of the runtime. /// /// The remote schedule count starts at zero when the runtime is created and @@ -446,6 +472,30 @@ impl RuntimeMetrics { pub fn worker_local_queue_depth(&self, worker: usize) -> usize { self.handle.inner.worker_local_queue_depth(worker) } + + /// Returns the number of tasks currently scheduled in the blocking + /// thread pool, spawned using `spawn_blocking`. + /// + /// This metric returns the **current** number of tasks pending in + /// blocking thread pool. As such, the returned value may increase + /// or decrease as new tasks are scheduled and processed. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.blocking_queue_depth(); + /// println!("{} tasks currently pending in the blocking thread pool", n); + /// } + /// ``` + pub fn blocking_queue_depth(&self) -> usize { + self.handle.inner.blocking_queue_depth() + } } cfg_net! { diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 778f93a84c5..f466a83e9e4 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -424,6 +424,14 @@ cfg_metrics! { assert_eq!(0, worker); &self.shared.worker_metrics } + + pub(crate) fn num_blocking_threads(&self) -> usize { + self.blocking_spawner.num_threads() + } + + pub(crate) fn blocking_queue_depth(&self) -> usize { + self.blocking_spawner.queue_depth() + } } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 18ac474fa75..eabf5a77248 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -111,6 +111,14 @@ cfg_rt! { } } + pub(crate) fn num_blocking_threads(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.num_blocking_threads(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.num_blocking_threads(), + } + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { match self { Handle::CurrentThread(handle) => handle.scheduler_metrics(), @@ -142,6 +150,14 @@ cfg_rt! { Handle::MultiThread(handle) => handle.worker_local_queue_depth(worker), } } + + pub(crate) fn blocking_queue_depth(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.blocking_queue_depth(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.blocking_queue_depth(), + } + } } } } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 884f400bf00..c0815231bb2 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -61,6 +61,10 @@ cfg_metrics! { self.shared.worker_metrics.len() } + pub(crate) fn num_blocking_threads(&self) -> usize { + self.blocking_spawner.num_threads() + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics } @@ -76,6 +80,10 @@ cfg_metrics! { pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { self.shared.worker_local_queue_depth(worker) } + + pub(crate) fn blocking_queue_depth(&self) -> usize { + self.blocking_spawner.queue_depth() + } } } diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index cffc117bce2..47dea08ea4c 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -1,6 +1,8 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))] +use std::sync::{Arc, Mutex}; + use tokio::runtime::Runtime; use tokio::time::{self, Duration}; @@ -13,6 +15,44 @@ fn num_workers() { assert_eq!(2, rt.metrics().num_workers()); } +#[test] +fn num_blocking_threads() { + let rt = current_thread(); + assert_eq!(0, rt.metrics().num_blocking_threads()); + let _ = rt.block_on(rt.spawn_blocking(move || {})); + assert_eq!(1, rt.metrics().num_blocking_threads()); +} + +#[test] +fn blocking_queue_depth() { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(1) + .build() + .unwrap(); + + assert_eq!(0, rt.metrics().blocking_queue_depth()); + + let ready = Arc::new(Mutex::new(())); + let guard = ready.lock().unwrap(); + + let ready_cloned = ready.clone(); + let wait_until_ready = move || { + let _unused = ready_cloned.lock().unwrap(); + }; + + let h1 = rt.spawn_blocking(wait_until_ready.clone()); + let h2 = rt.spawn_blocking(wait_until_ready); + assert!(rt.metrics().blocking_queue_depth() > 0); + + drop(guard); + + let _ = rt.block_on(h1); + let _ = rt.block_on(h2); + + assert_eq!(0, rt.metrics().blocking_queue_depth()); +} + #[test] fn remote_schedule_count() { use std::thread; From 7de50a6a83a00155972c8b8ff5765f6e3d22afd3 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Thu, 3 Nov 2022 13:20:55 -0300 Subject: [PATCH 2/2] idle blocking threads --- tokio/src/runtime/blocking/pool.rs | 35 +++++++++++++------ tokio/src/runtime/metrics/runtime.rs | 24 +++++++++++++ tokio/src/runtime/scheduler/current_thread.rs | 4 +++ tokio/src/runtime/scheduler/mod.rs | 8 +++++ .../runtime/scheduler/multi_thread/handle.rs | 4 +++ tokio/tests/rt_metrics.rs | 11 ++++++ 6 files changed, 76 insertions(+), 10 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index f3f206ce1f2..9c536141996 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -27,6 +27,7 @@ pub(crate) struct Spawner { #[derive(Default)] pub(crate) struct SpawnerMetrics { num_threads: AtomicUsize, + num_idle_threads: AtomicUsize, queue_depth: AtomicUsize, } @@ -35,6 +36,10 @@ impl SpawnerMetrics { self.num_threads.load(Ordering::Relaxed) } + fn num_idle_threads(&self) -> usize { + self.num_idle_threads.load(Ordering::Relaxed) + } + cfg_metrics! { fn queue_depth(&self) -> usize { self.queue_depth.load(Ordering::Relaxed) @@ -49,6 +54,14 @@ impl SpawnerMetrics { self.num_threads.fetch_sub(1, Ordering::Relaxed); } + fn inc_num_idle_threads(&self) { + self.num_idle_threads.fetch_add(1, Ordering::Relaxed); + } + + fn dec_num_idle_threads(&self) -> usize { + self.num_idle_threads.fetch_sub(1, Ordering::Relaxed) + } + fn inc_queue_depth(&self) { self.queue_depth.fetch_add(1, Ordering::Relaxed); } @@ -89,7 +102,6 @@ struct Inner { struct Shared { queue: VecDeque, - num_idle: u32, num_notify: u32, shutdown: bool, shutdown_tx: Option, @@ -202,7 +214,6 @@ impl BlockingPool { inner: Arc::new(Inner { shared: Mutex::new(Shared { queue: VecDeque::new(), - num_idle: 0, num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), @@ -389,7 +400,7 @@ impl Spawner { shared.queue.push_back(task); self.inner.metrics.inc_queue_depth(); - if shared.num_idle == 0 { + if self.inner.metrics.num_idle_threads() == 0 { // No threads are able to process the task. if self.inner.metrics.num_threads() == self.inner.thread_cap { @@ -429,7 +440,7 @@ impl Spawner { // exactly. Thread libraries may generate spurious // wakeups, this counter is used to keep us in a // consistent state. - shared.num_idle -= 1; + self.inner.metrics.dec_num_idle_threads(); shared.num_notify += 1; self.inner.condvar.notify_one(); } @@ -466,6 +477,10 @@ cfg_metrics! { self.inner.metrics.num_threads() } + pub(crate) fn num_idle_threads(&self) -> usize { + self.inner.metrics.num_idle_threads() + } + pub(crate) fn queue_depth(&self) -> usize { self.inner.metrics.queue_depth() } @@ -498,7 +513,7 @@ impl Inner { } // IDLE - shared.num_idle += 1; + self.metrics.inc_num_idle_threads(); while !shared.shutdown { let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); @@ -543,7 +558,7 @@ impl Inner { // Work was produced, and we "took" it (by decrementing num_notify). // This means that num_idle was decremented once for our wakeup. // But, since we are exiting, we need to "undo" that, as we'll stay idle. - shared.num_idle += 1; + self.metrics.inc_num_idle_threads(); // NOTE: Technically we should also do num_notify++ and notify again, // but since we're shutting down anyway, that won't be necessary. break; @@ -556,10 +571,10 @@ impl Inner { // num_idle should now be tracked exactly, panic // with a descriptive message if it is not the // case. - shared.num_idle = shared - .num_idle - .checked_sub(1) - .expect("num_idle underflowed on thread exit"); + let prev_idle = self.metrics.dec_num_idle_threads(); + if prev_idle < self.metrics.num_idle_threads() { + panic!("num_idle_threads underflowed on thread exit") + } if shared.shutdown && self.metrics.num_threads() == 0 { self.condvar.notify_one(); diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index cda162bb0c0..dee14a45729 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -68,6 +68,30 @@ impl RuntimeMetrics { self.handle.inner.num_blocking_threads() } + /// Returns the number of idle threads, which hve spawned by the runtime + /// for `spawn_blocking` calls. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let _ = tokio::task::spawn_blocking(move || { + /// // Stand-in for compute-heavy work or using synchronous APIs + /// 1 + 1 + /// }).await; + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_idle_blocking_threads(); + /// println!("Runtime has {} idle blocking thread pool threads", n); + /// } + /// ``` + pub fn num_idle_blocking_threads(&self) -> usize { + self.handle.inner.num_idle_blocking_threads() + } + /// Returns the number of tasks scheduled from **outside** of the runtime. /// /// The remote schedule count starts at zero when the runtime is created and diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index f466a83e9e4..3c739bfa430 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -429,6 +429,10 @@ cfg_metrics! { self.blocking_spawner.num_threads() } + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + self.blocking_spawner.num_idle_threads() + } + pub(crate) fn blocking_queue_depth(&self) -> usize { self.blocking_spawner.queue_depth() } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index eabf5a77248..b991c89f00c 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -119,6 +119,14 @@ cfg_rt! { } } + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.num_idle_blocking_threads(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.num_idle_blocking_threads(), + } + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { match self { Handle::CurrentThread(handle) => handle.scheduler_metrics(), diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index c0815231bb2..69a4620c127 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -65,6 +65,10 @@ cfg_metrics! { self.blocking_spawner.num_threads() } + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + self.blocking_spawner.num_idle_threads() + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics } diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 47dea08ea4c..71a60e4dfde 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -23,6 +23,17 @@ fn num_blocking_threads() { assert_eq!(1, rt.metrics().num_blocking_threads()); } +#[test] +fn num_idle_blocking_threads() { + let rt = current_thread(); + assert_eq!(0, rt.metrics().num_idle_blocking_threads()); + let _ = rt.block_on(rt.spawn_blocking(move || {})); + rt.block_on(async { + time::sleep(Duration::from_millis(5)).await; + }); + assert_eq!(1, rt.metrics().num_idle_blocking_threads()); +} + #[test] fn blocking_queue_depth() { let rt = tokio::runtime::Builder::new_current_thread()