Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: do not trace tasks while locking OwnedTasks #6036

Merged
merged 9 commits into from Oct 6, 2023
8 changes: 8 additions & 0 deletions tokio/src/runtime/handle.rs
Expand Up @@ -543,6 +543,14 @@ cfg_taskdump! {
scheduler::Handle::MultiThreadAlt(_) => panic!("task dump not implemented for this runtime flavor"),
}
}

/// Produces `true` if the current task is being traced for a dump;
/// otherwise false. This function is only public for integration
/// testing purposes. Do not rely on it.
#[doc(hidden)]
pub fn is_tracing() -> bool {
super::task::trace::Context::is_tracing()
}
}

cfg_rt_multi_thread! {
Expand Down
35 changes: 25 additions & 10 deletions tokio/src/runtime/task/trace/mod.rs
Expand Up @@ -100,6 +100,16 @@ impl Context {
Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL)
}
}

/// Produces `true` if the current task is being traced; otherwise false.
pub(crate) fn is_tracing() -> bool {
Self::with_current_collector(|maybe_collector| {
let collector = maybe_collector.take();
let result = collector.is_some();
maybe_collector.set(collector);
result
})
}
}

impl Trace {
Expand Down Expand Up @@ -275,6 +285,8 @@ pub(in crate::runtime) fn trace_current_thread(
task.as_raw().state().transition_to_notified_for_tracing();
// store the raw tasks into a vec
tasks.push(task.as_raw());
// do NOT poll `task` here, since we hold a lock on `owned` and the task
// may complete and need to remove itself from `owned`.
});

tasks
Expand Down Expand Up @@ -317,20 +329,23 @@ cfg_rt_multi_thread! {
drop(synced);

// notify each task
let mut traces = vec![];
let mut tasks = vec![];
owned.for_each(|task| {
// set the notified bit
task.as_raw().state().transition_to_notified_for_tracing();

// trace the task
let ((), trace) = Trace::capture(|| task.as_raw().poll());
traces.push(trace);

// reschedule the task
let _ = task.as_raw().state().transition_to_notified_by_ref();
task.as_raw().schedule();
// store the raw tasks into a vec
tasks.push(task.as_raw());
// do NOT poll `task` here, since we hold a lock on `owned` and the task
// may complete and need to remove itself from `owned`.
});

traces
tasks
.into_iter()
.map(|task| {
// trace the task
let ((), trace) = Trace::capture(|| task.poll());
trace
})
.collect()
}
}
57 changes: 57 additions & 0 deletions tokio/tests/dump.rs
Expand Up @@ -97,3 +97,60 @@ fn multi_thread() {
);
});
}

/// Regression tests for #6035.
///
/// These tests ensure that dumping will not deadlock if a future completes
/// during a trace.
mod future_completes_during_trace {
use super::*;

use core::future::{poll_fn, Future};

/// A future that completes only during a trace.
fn complete_during_trace() -> impl Future<Output = ()> + Send {
use std::task::Poll;
poll_fn(|cx| {
if Handle::is_tracing() {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
})
}

#[test]
fn current_thread() {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

async fn dump() {
let handle = Handle::current();
let _dump = handle.dump().await;
}

rt.block_on(async {
let _ = tokio::join!(tokio::spawn(complete_during_trace()), dump());
});
}

#[test]
fn multi_thread() {
let rt = runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

async fn dump() {
let handle = Handle::current();
let _dump = handle.dump().await;
}

rt.block_on(async {
let _ = tokio::join!(tokio::spawn(complete_during_trace()), dump());
});
}
}