Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add task counter pairs #6114

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
38 changes: 35 additions & 3 deletions tokio/src/runtime/metrics/runtime.rs
Expand Up @@ -69,7 +69,15 @@ impl RuntimeMetrics {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of active tasks in the runtime.
#[deprecated]
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
/// Renamed to [`RuntimeMetrics::num_active_tasks`]
pub fn active_tasks_count(&self) -> usize {
self.num_active_tasks()
}

/// Returns the current number of active tasks in the runtime.
///
/// This value increases and decreases over time as tasks are spawned and as they are completed or cancelled.
///
/// # Examples
///
Expand All @@ -80,14 +88,38 @@ impl RuntimeMetrics {
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.active_tasks_count();
/// let n = metrics.num_active_tasks();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn active_tasks_count(&self) -> usize {
pub fn num_active_tasks(&self) -> usize {
self.handle.inner.active_tasks_count()
}

/// Returns the number of tasks spawned in this runtime since it was created.
///
/// This count starts at zero when the runtime is created and increases by one each time a task is spawned.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.spawned_tasks_count();
/// println!("Runtime has had {} tasks spawned", n);
/// }
/// ```
pub fn spawned_tasks_count(&self) -> u64 {
self.handle.inner.spawned_tasks_count()
}

/// Returns the number of idle threads, which have spawned by the runtime
/// for `spawn_blocking` calls.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Expand Up @@ -536,6 +536,10 @@ cfg_metrics! {
pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Expand Up @@ -189,6 +189,10 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.active_tasks_count())
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Expand Up @@ -19,6 +19,10 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
Expand Up @@ -19,6 +19,10 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/task/list.rs
Expand Up @@ -170,6 +170,10 @@ impl<S: 'static> OwnedTasks<S> {
self.list.len()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.list.added()
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
// If the task's owner ID is `None` then it is not part of any list and
// doesn't need removing.
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/util/sharded_list.rs
@@ -1,6 +1,7 @@
use std::ptr::NonNull;
use std::sync::atomic::Ordering;

use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::{Mutex, MutexGuard};
use std::sync::atomic::AtomicUsize;

Expand All @@ -14,6 +15,7 @@ use super::linked_list::{Link, LinkedList};
/// Note: Due to its inner sharded design, the order of nodes cannot be guaranteed.
pub(crate) struct ShardedList<L, T> {
lists: Box<[Mutex<LinkedList<L, T>>]>,
added: AtomicU64,
count: AtomicUsize,
shard_mask: usize,
}
Expand Down Expand Up @@ -42,6 +44,7 @@ impl<L, T> ShardedList<L, T> {
}
Self {
lists: lists.into_boxed_slice(),
added: AtomicU64::new(0),
count: AtomicUsize::new(0),
shard_mask,
}
Expand All @@ -51,6 +54,7 @@ impl<L, T> ShardedList<L, T> {
/// Used to get the lock of shard.
pub(crate) struct ShardGuard<'a, L, T> {
lock: MutexGuard<'a, LinkedList<L, T>>,
added: &'a AtomicU64,
count: &'a AtomicUsize,
id: usize,
}
Expand Down Expand Up @@ -92,6 +96,7 @@ impl<L: ShardedListItem> ShardedList<L, L::Target> {
let id = unsafe { L::get_shard_id(L::as_raw(val)) };
ShardGuard {
lock: self.shard_inner(id),
added: &self.added,
count: &self.count,
id,
}
Expand All @@ -102,6 +107,11 @@ impl<L: ShardedListItem> ShardedList<L, L::Target> {
self.count.load(Ordering::Relaxed)
}

/// Gets the total number of elements added to this list.
pub(crate) fn added(&self) -> u64 {
self.added.load(Ordering::Relaxed)
}

/// Returns whether the linked list does not contain any node.
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
Expand All @@ -127,6 +137,7 @@ impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> {
let id = unsafe { L::get_shard_id(L::as_raw(&val)) };
assert_eq!(id, self.id);
self.lock.push_front(val);
self.added.fetch_add(1, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
}
}
Expand Down
45 changes: 36 additions & 9 deletions tokio/tests/rt_metrics.rs
Expand Up @@ -82,20 +82,47 @@ fn blocking_queue_depth() {
}

#[test]
fn active_tasks_count() {
fn num_active_tasks() {
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
assert_eq!(0, metrics.num_active_tasks());
rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.num_active_tasks());
}))
.unwrap();

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
assert_eq!(0, metrics.num_active_tasks());
rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.num_active_tasks());
}))
.unwrap();
}

#[test]
fn spawned_tasks_count() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.spawned_tasks_count());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.spawned_tasks_count());
}))
.unwrap();

assert_eq!(1, rt.metrics().spawned_tasks_count());

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.spawned_tasks_count());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.spawned_tasks_count());
}))
.unwrap();

assert_eq!(1, rt.metrics().spawned_tasks_count());
}

#[test]
Expand Down