Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt(threaded): cap LIFO slot polls #5712

Merged
merged 11 commits into from May 23, 2023
26 changes: 26 additions & 0 deletions .github/workflows/ci.yml
Expand Up @@ -60,6 +60,7 @@ jobs:
- wasm32-wasi
- check-external-types
- check-fuzzing
- check-unstable-mt-counters
steps:
- run: exit 0

Expand Down Expand Up @@ -233,6 +234,31 @@ jobs:
# the unstable cfg to RustDoc
RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_taskdump

check-unstable-mt-counters:
name: check tokio full --internal-mt-counters
needs: basics
runs-on: ${{ matrix.os }}
strategy:
matrix:
include:
- os: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
# Run `tokio` with "unstable" and "taskdump" cfg flags.
- name: check tokio full --cfg unstable --cfg internal-mt-counters
run: cargo test --all-features
working-directory: tokio
env:
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_internal_mt_counters -Dwarnings
# in order to run doctests for unstable features, we must also pass
# the unstable cfg to RustDoc
RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_internal_mt_counters

miri:
name: miri
needs: basics
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/loom.yml
Expand Up @@ -8,7 +8,7 @@ on:
name: Loom

env:
RUSTFLAGS: -Dwarnings
RUSTFLAGS: -Dwarnings -C debug-assertions
RUST_BACKTRACE: 1
# Change to specific Rust release to pin
rust_stable: stable
Expand Down
11 changes: 0 additions & 11 deletions tokio/src/runtime/coop.rs
Expand Up @@ -119,17 +119,6 @@ cfg_rt_multi_thread! {
pub(crate) fn set(budget: Budget) {
let _ = context::budget(|cell| cell.set(budget));
}

/// Consume one unit of progress from the current task's budget.
pub(crate) fn consume_one() {
let _ = context::budget(|cell| {
let mut budget = cell.get();
if let Some(ref mut counter) = budget.0 {
*counter = counter.saturating_sub(1);
}
cell.set(budget);
});
}
}

cfg_rt! {
Expand Down
16 changes: 16 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/counters.rs
Expand Up @@ -6,17 +6,23 @@ mod imp {
static NUM_MAINTENANCE: AtomicUsize = AtomicUsize::new(0);
static NUM_NOTIFY_LOCAL: AtomicUsize = AtomicUsize::new(0);
static NUM_UNPARKS_LOCAL: AtomicUsize = AtomicUsize::new(0);
static NUM_LIFO_SCHEDULES: AtomicUsize = AtomicUsize::new(0);
static NUM_LIFO_CAPPED: AtomicUsize = AtomicUsize::new(0);

impl Drop for super::Counters {
fn drop(&mut self) {
let notifies_local = NUM_NOTIFY_LOCAL.load(Relaxed);
let unparks_local = NUM_UNPARKS_LOCAL.load(Relaxed);
let maintenance = NUM_MAINTENANCE.load(Relaxed);
let lifo_scheds = NUM_LIFO_SCHEDULES.load(Relaxed);
let lifo_capped = NUM_LIFO_CAPPED.load(Relaxed);

println!("---");
println!("notifies (local): {}", notifies_local);
println!(" unparks (local): {}", unparks_local);
println!(" maintenance: {}", maintenance);
println!(" LIFO schedules: {}", lifo_scheds);
println!(" LIFO capped: {}", lifo_capped);
}
}

Expand All @@ -31,13 +37,23 @@ mod imp {
pub(crate) fn inc_num_maintenance() {
NUM_MAINTENANCE.fetch_add(1, Relaxed);
}

pub(crate) fn inc_lifo_schedules() {
NUM_LIFO_SCHEDULES.fetch_add(1, Relaxed);
}

pub(crate) fn inc_lifo_capped() {
NUM_LIFO_CAPPED.fetch_add(1, Relaxed);
}
}

#[cfg(not(tokio_internal_mt_counters))]
mod imp {
pub(crate) fn inc_num_inc_notify_local() {}
pub(crate) fn inc_num_unparks_local() {}
pub(crate) fn inc_num_maintenance() {}
pub(crate) fn inc_lifo_schedules() {}
pub(crate) fn inc_lifo_capped() {}
}

#[derive(Debug)]
Expand Down
90 changes: 72 additions & 18 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
carllerche marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -90,10 +90,14 @@ struct Core {
/// When a task is scheduled from a worker, it is stored in this slot. The
/// worker will check this slot for a task **before** checking the run
/// queue. This effectively results in the **last** scheduled task to be run
/// next (LIFO). This is an optimization for message passing patterns and
/// helps to reduce latency.
/// next (LIFO). This is an optimization for improving locality which
/// benefits message passing patterns and helps to reduce latency.
lifo_slot: Option<Notified>,

/// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
/// they go to the back of the `run_queue`.
lifo_enabled: bool,

/// The worker-local run queue.
run_queue: queue::Local<Arc<Handle>>,

Expand Down Expand Up @@ -191,6 +195,12 @@ type Notified = task::Notified<Arc<Handle>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);

/// Value picked out of thin-air. Running the LIFO slot a handful of times
/// seemms sufficient to benefit from locality. More than 3 times probably is
/// overweighing. The value can be tuned in the future with data that shows
/// improvements.
const MAX_LIFO_POLLS_PER_TICK: usize = 3;
carllerche marked this conversation as resolved.
Show resolved Hide resolved

pub(super) fn create(
size: usize,
park: Parker,
Expand All @@ -214,6 +224,7 @@ pub(super) fn create(
cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
lifo_enabled: !config.disable_lifo_slot,
run_queue,
is_searching: false,
is_shutdown: false,
Expand Down Expand Up @@ -422,7 +433,13 @@ fn run(worker: Arc<Worker>) {

impl Context {
fn run(&self, mut core: Box<Core>) -> RunResult {
// Reset `lifo_enabled` here in case the core was previously stolen from
// a task that had the LIFO slot disabled.
self.reset_lifo_enabled(&mut core);

while !core.is_shutdown {
self.assert_lifo_enabled_is_correct(&core);

// Increment the tick
core.tick();

Expand Down Expand Up @@ -463,13 +480,16 @@ impl Context {
// another idle worker to try to steal work.
core.transition_from_searching(&self.worker);

self.assert_lifo_enabled_is_correct(&core);

// Make the core available to the runtime context
core.metrics.start_poll();
*self.core.borrow_mut() = Some(core);

// Run the task
coop::budget(|| {
task.run();
let mut lifo_polls = 0;

// As long as there is budget remaining and a task exists in the
// `lifo_slot`, then keep running.
Expand All @@ -478,7 +498,12 @@ impl Context {
// by another worker.
let mut core = match self.core.borrow_mut().take() {
Some(core) => core,
None => return Err(()),
None => {
// In this case, we cannot call `reset_lifo_enabled()`
// because the core was stolen. The stealer will handle
// that at the top of `Context::run`
return Err(());
}
};

// If task poll times is enabled, measure the poll time. Note
Expand All @@ -491,35 +516,62 @@ impl Context {
// Check for a task in the LIFO slot
let task = match core.lifo_slot.take() {
Some(task) => task,
None => return Ok(core),
None => {
self.reset_lifo_enabled(&mut core);
return Ok(core);
}
};

// Polling a task doesn't necessarily consume any budget, if it
// doesn't use any Tokio leaf futures. To prevent such tasks
// from using the lifo slot in an infinite loop, we consume an
// extra unit of budget between each iteration of the loop.
coop::consume_one();

if coop::has_budget_remaining() {
// Run the LIFO task, then loop
core.metrics.start_poll();
*self.core.borrow_mut() = Some(core);
let task = self.worker.handle.shared.owned.assert_owner(task);
task.run();
} else {
if !coop::has_budget_remaining() {
// Not enough budget left to run the LIFO task, push it to
// the back of the queue and return.
core.run_queue.push_back_or_overflow(
task,
self.worker.inject(),
&mut core.metrics,
);
// If we hit this point, the LIFO slot should be enabled.
// There is no need to reset it.
debug_assert!(core.lifo_enabled);
return Ok(core);
}

// Track that we are about to run a task from the LIFO slot.
lifo_polls += 1;
super::counters::inc_lifo_schedules();

// Disable the LIFO slot if we reach our limit
//
// In ping-ping style workloads where task A notifies task B,
// which notifies task A again, continuously prioritizing the
// LIFO slot can cause starvation as these two tasks will
// repeatedly schedule the other. To mitigate this, we limit the
// number of times the LIFO slot is prioritized.
if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
core.lifo_enabled = false;
super::counters::inc_lifo_capped();
}

// Run the LIFO task, then loop
core.metrics.start_poll();
*self.core.borrow_mut() = Some(core);
let task = self.worker.handle.shared.owned.assert_owner(task);
task.run();
Comment on lines +555 to +559
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it not be less error-prone to reset lifo_enabled after calling task.run()? For example, right now you don't reset it in the Err(()) branch. That's probably fine because the worker thread shuts down there, but it is non-obvious.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in the error case, the core is stolen. We cannot reset it either way. That said, it brings up a point that we need to reset it if it is stolen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just reset before polling a task ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset before polling a task isn't robust either because we can schedule tasks outside of the context of running a task. e.g. from polling the I/O driver.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, what I suggest is we add debug_asserts around to ensure the value is correct.

}
})
}

fn reset_lifo_enabled(&self, core: &mut Core) {
core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
}

fn assert_lifo_enabled_is_correct(&self, core: &Core) {
debug_assert_eq!(
core.lifo_enabled,
!self.worker.handle.shared.config.disable_lifo_slot
);
}

fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % self.worker.handle.shared.config.event_interval == 0 {
super::counters::inc_num_maintenance();
Expand Down Expand Up @@ -573,6 +625,8 @@ impl Context {
}

fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
self.assert_lifo_enabled_is_correct(&core);

// Take the parker out of core
let mut park = core.park.take().expect("park missing");

Expand Down Expand Up @@ -840,7 +894,7 @@ impl Handle {
// task must always be pushed to the back of the queue, enabling other
// tasks to be executed. If **not** a yield, then there is more
// flexibility and the task may go to the front of the queue.
let should_notify = if is_yield || self.shared.config.disable_lifo_slot {
let should_notify = if is_yield || !core.lifo_enabled {
core.run_queue
.push_back_or_overflow(task, &self.shared.inject, &mut core.metrics);
true
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/runtime/tests/mod.rs
Expand Up @@ -60,6 +60,12 @@ cfg_loom! {
mod loom_shutdown_join;
mod loom_join_set;
mod loom_yield;

// Make sure debug assertions are enabled
#[test]
fn ensure_debug_assertions_are_enabled() {
assert!(cfg!(debug_assertions));
}
carllerche marked this conversation as resolved.
Show resolved Hide resolved
}

cfg_not_loom! {
Expand Down