Skip to content

Commit

Permalink
switch to CounterPair struct, improve reliability of flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Feb 12, 2024
1 parent 328ddbe commit 176d74b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 43 deletions.
51 changes: 25 additions & 26 deletions tokio/src/runtime/metrics/runtime.rs
@@ -1,4 +1,4 @@
use crate::runtime::Handle;
use crate::runtime::{CounterPair, Handle};

use std::ops::Range;
use std::sync::atomic::Ordering::Relaxed;
Expand All @@ -15,6 +15,18 @@ pub struct RuntimeMetrics {
handle: Handle,
}

impl CounterPair {
/// Determines the current length of the pair
pub fn len(&self) -> usize {
(self.inc - self.dec) as usize
}

/// Determines if the counter pair represents an empty collection
pub fn is_empty(&self) -> bool {
self.inc == self.dec
}
}

impl RuntimeMetrics {
pub(crate) fn new(handle: Handle) -> RuntimeMetrics {
RuntimeMetrics { handle }
Expand Down Expand Up @@ -88,26 +100,7 @@ impl RuntimeMetrics {
self.handle.inner.active_tasks_count()
}

/// Returns the number of started tasks in the runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.start_tasks_count();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn start_tasks_count(&self) -> u64 {
self.handle.inner.start_tasks_count()
}

/// Returns the number of finished tasks in the runtime.
/// Returns a counter pair representing the number of started and completed tasks in the runtime.
///
/// # Examples
///
Expand All @@ -118,13 +111,19 @@ impl RuntimeMetrics {
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.stop_tasks_count();
/// println!("Runtime has {} active tasks", n);
/// let n = metrics.task_counts();
/// println!("Runtime has {} started tasks", n.inc);
/// println!("Runtime has {} completed tasks", n.dec);
/// println!("Runtime has {} active tasks", n.len());
/// }
/// ```
pub fn stop_tasks_count(&self) -> u64 {
self.start_tasks_count()
.saturating_sub(self.active_tasks_count() as u64)
pub fn task_counts(&self) -> CounterPair {
let added = self.handle.inner.start_tasks_count();
let active = self.active_tasks_count();
CounterPair {
inc: added,
dec: added.saturating_sub(active as u64),
}
}

/// Returns the number of idle threads, which have spawned by the runtime
Expand Down
17 changes: 17 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -391,6 +391,7 @@ cfg_rt! {
cfg_metrics! {
mod metrics;
pub use metrics::{RuntimeMetrics, HistogramScale};
pub use crate::runtime::counter_pair::CounterPair;

pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder};

Expand All @@ -407,3 +408,19 @@ cfg_rt! {
/// After thread starts / before thread stops
type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
}

pub(crate) mod counter_pair {
/// A gauge represented as two counters.
///
/// Instead of decrementing a gauge, we increment a decrements counter.
/// This is beneficial as it allows you to observe activity spikes that occur
/// inbetween a scrape interval
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[allow(unreachable_pub)] // rust-lang/rust#57411
pub struct CounterPair {
/// Tracks how many times this gauge was incremented
pub inc: u64,
/// Tracks how many times this gauge was decremeneted
pub dec: u64,
}
}
31 changes: 14 additions & 17 deletions tokio/tests/rt_metrics.rs
Expand Up @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex};
use std::task::Poll;
use tokio::macros::support::poll_fn;

use tokio::runtime::CounterPair;
use tokio::runtime::Runtime;
use tokio::task::consume_budget;
use tokio::time::{self, Duration};
Expand Down Expand Up @@ -104,36 +105,32 @@ fn active_tasks_count() {
fn active_tasks_count_pairs() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.start_tasks_count());
assert_eq!(0, metrics.stop_tasks_count());
assert_eq!(CounterPair { inc: 0, dec: 0 }, metrics.task_counts());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.start_tasks_count());
assert_eq!(0, metrics.stop_tasks_count());
assert_eq!(CounterPair { inc: 1, dec: 0 }, metrics.task_counts());
}))
.unwrap();

assert_eq!(1, rt.metrics().start_tasks_count());
assert_eq!(1, rt.metrics().stop_tasks_count());
assert_eq!(CounterPair { inc: 1, dec: 1 }, rt.metrics().task_counts());

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.start_tasks_count());
assert_eq!(0, metrics.stop_tasks_count());
assert_eq!(CounterPair { inc: 0, dec: 0 }, metrics.task_counts());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.start_tasks_count());
assert_eq!(0, metrics.stop_tasks_count());
assert_eq!(CounterPair { inc: 1, dec: 0 }, metrics.task_counts());
}))
.unwrap();

// for some reason, sometimes the stop count doesn't get a chance to incremenet before we get here.
// Only observed on single-cpu systems. Most likely the worker thread doesn't a chance to clean up
// the spawned task yet. We yield to give it an opportunity.
std::thread::yield_now();

assert_eq!(1, rt.metrics().start_tasks_count());
assert_eq!(1, rt.metrics().stop_tasks_count());
for _ in 0..100 {
if rt.metrics().task_counts() == (CounterPair { inc: 1, dec: 1 }) {
return;
}
// on single threaded machines (like in CI), we need to force the OS to run the runtime threads
std::thread::sleep(std::time::Duration::from_millis(1));
}
panic!("runtime didn't decrement active task gauge")
}

#[test]
Expand Down

0 comments on commit 176d74b

Please sign in to comment.