Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add task counter pairs #6114

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
38 changes: 35 additions & 3 deletions tokio/src/runtime/metrics/runtime.rs
Expand Up @@ -69,7 +69,15 @@ impl RuntimeMetrics {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of active tasks in the runtime.
#[deprecated = "Renamed to num_active_tasks"]
/// Renamed to [`RuntimeMetrics::num_active_tasks`]
pub fn active_tasks_count(&self) -> usize {
self.num_active_tasks()
}

/// Returns the current number of active tasks in the runtime.
///
/// This value increases and decreases over time as tasks are spawned and as they are completed or cancelled.
///
/// # Examples
///
Expand All @@ -80,14 +88,38 @@ impl RuntimeMetrics {
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.active_tasks_count();
/// let n = metrics.num_active_tasks();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn active_tasks_count(&self) -> usize {
pub fn num_active_tasks(&self) -> usize {
self.handle.inner.active_tasks_count()
}

/// Returns the number of tasks spawned in this runtime since it was created.
///
/// This count starts at zero when the runtime is created and increases by one each time a task is spawned.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.spawned_tasks_count();
/// println!("Runtime has had {} tasks spawned", n);
/// }
/// ```
pub fn spawned_tasks_count(&self) -> u64 {
self.handle.inner.spawned_tasks_count()
}

/// Returns the number of idle threads, which have spawned by the runtime
/// for `spawn_blocking` calls.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Expand Up @@ -536,6 +536,10 @@ cfg_metrics! {
pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Expand Up @@ -189,6 +189,10 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.active_tasks_count())
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Expand Up @@ -19,6 +19,10 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
Expand Up @@ -19,6 +19,10 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/task/list.rs
Expand Up @@ -170,6 +170,10 @@ impl<S: 'static> OwnedTasks<S> {
self.list.len()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.list.added()
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
// If the task's owner ID is `None` then it is not part of any list and
// doesn't need removing.
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/util/sharded_list.rs
@@ -1,6 +1,7 @@
use std::ptr::NonNull;
use std::sync::atomic::Ordering;

use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::{Mutex, MutexGuard};
use std::sync::atomic::AtomicUsize;

Expand All @@ -14,6 +15,7 @@ use super::linked_list::{Link, LinkedList};
/// Note: Due to its inner sharded design, the order of nodes cannot be guaranteed.
pub(crate) struct ShardedList<L, T> {
lists: Box<[Mutex<LinkedList<L, T>>]>,
added: AtomicU64,
count: AtomicUsize,
shard_mask: usize,
}
Expand Down Expand Up @@ -42,6 +44,7 @@ impl<L, T> ShardedList<L, T> {
}
Self {
lists: lists.into_boxed_slice(),
added: AtomicU64::new(0),
count: AtomicUsize::new(0),
shard_mask,
}
Expand All @@ -51,6 +54,7 @@ impl<L, T> ShardedList<L, T> {
/// Used to get the lock of shard.
pub(crate) struct ShardGuard<'a, L, T> {
lock: MutexGuard<'a, LinkedList<L, T>>,
added: &'a AtomicU64,
count: &'a AtomicUsize,
id: usize,
}
Expand Down Expand Up @@ -92,6 +96,7 @@ impl<L: ShardedListItem> ShardedList<L, L::Target> {
let id = unsafe { L::get_shard_id(L::as_raw(val)) };
ShardGuard {
lock: self.shard_inner(id),
added: &self.added,
count: &self.count,
id,
}
Expand All @@ -102,6 +107,11 @@ impl<L: ShardedListItem> ShardedList<L, L::Target> {
self.count.load(Ordering::Relaxed)
}

/// Gets the total number of elements added to this list.
pub(crate) fn added(&self) -> u64 {
self.added.load(Ordering::Relaxed)
}

/// Returns whether the linked list does not contain any node.
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
Expand All @@ -127,6 +137,7 @@ impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> {
let id = unsafe { L::get_shard_id(L::as_raw(&val)) };
assert_eq!(id, self.id);
self.lock.push_front(val);
self.added.fetch_add(1, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
}
}
Expand Down
50 changes: 41 additions & 9 deletions tokio/tests/rt_metrics.rs
Expand Up @@ -82,20 +82,52 @@ fn blocking_queue_depth() {
}

#[test]
fn active_tasks_count() {
fn num_active_tasks() {
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
assert_eq!(0, metrics.num_active_tasks());
rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.num_active_tasks());
}))
.unwrap();

assert_eq!(0, rt.metrics().num_active_tasks());

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
assert_eq!(0, metrics.num_active_tasks());
rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.num_active_tasks());
}))
.unwrap();

// this is too flaky on multiple runtime threads due to relaxed orderings
// assert_eq!(0, metrics.num_active_tasks());
Comment on lines +104 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not because the counter uses relaxed orderings. Rather, it's because we call wake_join() before self.release() here:

} else if snapshot.is_join_waker_set() {
// Notify the waker. Reading the waker field is safe per rule 4
// in task/mod.rs, since the JOIN_WAKER bit is set and the call
// to transition_to_complete() above set the COMPLETE bit.
self.trailer().wake_join();
}
}));
// The task has completed execution and will no longer be scheduled.
let num_release = self.release();

The decrement happens somewhere inside self.release().

I'm happy to merge this without the assert, but please fix the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we could do is to use a sleep-loop until the two values are equal. This way we assert that it goes back to zero eventually. You can fail the test if it's still non-zero after 10 seconds.

}

#[test]
fn spawned_tasks_count() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.spawned_tasks_count());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.spawned_tasks_count());
}))
.unwrap();

assert_eq!(1, rt.metrics().spawned_tasks_count());

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.spawned_tasks_count());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.spawned_tasks_count());
}))
.unwrap();

assert_eq!(1, rt.metrics().spawned_tasks_count());
}

#[test]
Expand Down