Skip to content

Commit

Permalink
runtime: fix double-increment of num_idle_threads when shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
liuq19 committed Mar 29, 2024
1 parent 9c337ca commit 1867598
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,17 +524,28 @@ impl Inner {
shared = lock_result.0;
let timeout_result = lock_result.1;

// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if shared.shutdown {
// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
self.metrics.inc_num_idle_threads();

// NOTE: Technically we should also do num_notify++ and notify again,
// but since we're shutting down anyway, that won't be necessary.
break;
}

if shared.num_notify != 0 {
// We have received a legitimate wakeup,
// acknowledge it by decrementing the counter
// and transition to the BUSY state.
shared.num_notify -= 1;
break;
continue 'main;
}

// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
if timeout_result.timed_out() {
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
// This isn't done when shutting down, because the thread calling shutdown will
// handle joining everything.
Expand All @@ -547,25 +558,17 @@ impl Inner {
// Spurious wakeup detected, go back to sleep.
}

if shared.shutdown {
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);

task.shutdown_or_run_if_mandatory();
// SHUTED
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);

shared = self.shared.lock();
}
task.shutdown_or_run_if_mandatory();

// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
self.metrics.inc_num_idle_threads();
// NOTE: Technically we should also do num_notify++ and notify again,
// but since we're shutting down anyway, that won't be necessary.
break;
shared = self.shared.lock();
}
break;
}

// Thread exit
Expand Down

0 comments on commit 1867598

Please sign in to comment.