Skip to content

Commit

Permalink
taskdump: implement task dumps for multi-thread runtime
Browse files Browse the repository at this point in the history
This PR implements task dumps on the multi-thread runtime. It
complements tokio-rs#5608, which implemented task dumps on the current-thread
runtime.
  • Loading branch information
jswrenn committed May 24, 2023
1 parent 52bc6b6 commit 85553e6
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 21 deletions.
34 changes: 22 additions & 12 deletions examples/dump.rs
Expand Up @@ -6,8 +6,10 @@
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
))]
#[tokio::main(flavor = "current_thread")]
#[tokio::main(worker_threads = 72)]
async fn main() {
println!("pid={}", std::process::id());

use std::hint::black_box;

#[inline(never)]
Expand All @@ -22,21 +24,29 @@ async fn main() {

#[inline(never)]
async fn c() {
black_box(tokio::task::yield_now()).await
loop {
tokio::task::yield_now().await;
}
}

tokio::spawn(a());
tokio::spawn(b());
tokio::spawn(c());

let handle = tokio::runtime::Handle::current();
let dump = handle.dump();
async fn dump() {
let handle = tokio::runtime::Handle::current();
let dump = handle.dump().await;

for (i, task) in dump.tasks().iter().enumerate() {
let trace = task.trace();
println!("task {i} trace:");
println!("{trace}");
for (i, task) in dump.tasks().iter().enumerate() {
let trace = task.trace();
println!("task {i} trace:");
println!("{trace}");
}
}

tokio::select!(
biased;
_ = tokio::spawn(a()) => {},
_ = dump() => {},
);

println!("END OF MAIN");
}

#[cfg(not(all(
Expand Down
14 changes: 11 additions & 3 deletions tokio/src/runtime/handle.rs
Expand Up @@ -341,12 +341,20 @@ cfg_metrics! {
cfg_taskdump! {
impl Handle {
/// Capture a snapshot of this runtime's state.
pub fn dump(&self) -> crate::runtime::Dump {
pub async fn dump(&self) -> crate::runtime::Dump {
match &self.inner {
scheduler::Handle::CurrentThread(handle) => handle.dump(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) =>
unimplemented!("taskdumps are unsupported on the multi-thread runtime"),
scheduler::Handle::MultiThread(handle) => {
// perform the trace in a separate thread
let handle = handle.clone();
let (tx, rx) = crate::sync::oneshot::channel();
crate::loom::thread::spawn(|| {
let handle = handle;
tx.send(handle.dump()).unwrap();
});
rx.await.unwrap()
},
}
}
}
Expand Down
31 changes: 31 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Expand Up @@ -95,6 +95,37 @@ cfg_metrics! {
}
}

cfg_taskdump! {
impl Handle {
pub(crate) fn dump(&self) -> crate::runtime::Dump {
let trace_status = &self.shared.trace_status;

println!("{:?} Handle::dump: trace requested; waiting for pending dump requests to complete", std::thread::current().id());

// If a dump is in progress, block.
trace_status.start_trace_request(&self);

println!("{:?} Handle::dump: no pending dump requests; waiting for result for this request", std::thread::current().id());

let result = loop {
if let Some(result) = trace_status.take_result() {
break result;
} else {
self.notify_all();
crate::loom::thread::yield_now();
}
};

println!("{:?} Handle::dump: result received; allowing other dump requests to proceed", std::thread::current().id());

// Allow other queued dumps to proceed.
trace_status.end_trace_request(&self);

result
}
}
}

impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("multi_thread::Handle { ... }").finish()
Expand Down

0 comments on commit 85553e6

Please sign in to comment.