Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: only mitigate false sharing for multi-threaded runtimes #6240

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -1014,6 +1014,17 @@ impl task::Schedule for Arc<Handle> {
fn yield_now(&self, task: Notified) {
self.schedule_task(task, true);
}

fn min_align(&self) -> usize {
use crate::util::cacheline::CachePadded;

// One for single-threaded runtime, otherwise use a high value to avoid
// false sharing.
match self.shared.remotes.len() {
1 => 1,
_ => std::mem::align_of::<CachePadded<()>>(),
}
}
}

impl Handle {
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Expand Up @@ -1553,6 +1553,17 @@ impl task::Schedule for Arc<Handle> {
fn yield_now(&self, task: Notified) {
self.shared.schedule_task(task, true);
}

fn min_align(&self) -> usize {
use crate::util::cacheline::CachePadded;

// One for single-threaded runtime, otherwise use a high value to avoid
// false sharing.
match self.shared.remotes.len() {
1 => 1,
_ => std::mem::align_of::<CachePadded<()>>(),
}
}
}

impl AsMut<Synced> for Synced {
Expand Down
96 changes: 7 additions & 89 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -14,7 +14,7 @@ use crate::loom::cell::UnsafeCell;
use crate::runtime::context;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::{Id, Schedule};
use crate::runtime::task::{Id, Schedule, TaskBox};
use crate::util::linked_list;

use std::num::NonZeroU64;
Expand All @@ -30,87 +30,6 @@ use std::task::{Context, Poll, Waker};
/// Any changes to the layout of this struct _must_ also be reflected in the
/// const fns in raw.rs.
///
// # This struct should be cache padded to avoid false sharing. The cache padding rules are copied
// from crossbeam-utils/src/cache_padded.rs
//
// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
// lines at a time, so we have to align to 128 bytes rather than 64.
//
// Sources:
// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
//
// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
//
// Sources:
// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
//
// powerpc64 has 128-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
#[cfg_attr(
any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
),
repr(align(128))
)]
// arm, mips, mips64, sparc, and hexagon have 32-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "sparc",
target_arch = "hexagon",
),
repr(align(32))
)]
// m68k has 16-byte cache line size.
//
// Sources:
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
#[cfg_attr(target_arch = "m68k", repr(align(16)))]
// s390x has 256-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
// x86, riscv, wasm, and sparc64 have 64-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10
//
// All others are assumed to have 64-byte cache line size.
#[cfg_attr(
not(any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "sparc",
target_arch = "hexagon",
target_arch = "m68k",
target_arch = "s390x",
)),
repr(align(64))
)]
#[repr(C)]
pub(super) struct Cell<T: Future, S> {
/// Hot task state data
Expand Down Expand Up @@ -205,7 +124,7 @@ pub(super) enum Stage<T: Future> {
impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> TaskBox<T, S> {
// Separated into a non-generic function to reduce LLVM codegen
fn new_header(
state: State,
Expand All @@ -225,22 +144,21 @@ impl<T: Future, S: Schedule> Cell<T, S> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let tracing_id = future.id();
let vtable = raw::vtable::<T, S>();
let result = Box::new(Cell {
header: new_header(
let result = TaskBox::new(
new_header(
state,
vtable,
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing_id,
),
core: Core {
Core {
scheduler,
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
task_id,
},
trailer: Trailer::new(),
});
);

#[cfg(debug_assertions)]
{
Expand Down Expand Up @@ -459,7 +377,7 @@ impl Header {
}

impl Trailer {
fn new() -> Self {
pub(super) fn new() -> Self {
Trailer {
waker: UnsafeCell::new(None),
owned: linked_list::Pointers::new(),
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -2,7 +2,7 @@ use crate::future::Future;
use crate::runtime::task::core::{Cell, Core, Header, Trailer};
use crate::runtime::task::state::{Snapshot, State};
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task};
use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task, TaskBox};

use std::any::Any;
use std::mem;
Expand Down Expand Up @@ -269,7 +269,7 @@ where
// are allowed to be dangling after their last use, even if the
// reference has not yet gone out of scope.
unsafe {
drop(Box::from_raw(self.cell.as_ptr()));
drop(TaskBox::from_raw(self.cell));
}
}

Expand Down
12 changes: 12 additions & 0 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -200,6 +200,9 @@ pub(crate) use self::raw::RawTask;
mod state;
use self::state::State;

mod task_box;
use self::task_box::TaskBox;

mod waker;

cfg_taskdump! {
Expand Down Expand Up @@ -272,6 +275,15 @@ pub(crate) trait Schedule: Sync + Sized + 'static {
self.schedule(task);
}

/// The minimum alignment for tasks spawned on this runtime.
///
/// This is used by the multi-threaded runtime to avoid false sharing.
///
/// The same scheduler must always return the same value.
fn min_align(&self) -> usize {
1
}

/// Polling the task resulted in a panic. Should the runtime shutdown?
fn unhandled_panic(&self) {
// By default, do nothing. This maintains the 1.0 behavior.
Expand Down
7 changes: 3 additions & 4 deletions tokio/src/runtime/task/raw.rs
@@ -1,6 +1,6 @@
use crate::future::Future;
use crate::runtime::task::core::{Core, Trailer};
use crate::runtime::task::{Cell, Harness, Header, Id, Schedule, State};
use crate::runtime::task::{Cell, Harness, Header, Id, Schedule, State, TaskBox};

use std::ptr::NonNull;
use std::task::{Poll, Waker};
Expand Down Expand Up @@ -162,10 +162,9 @@ impl RawTask {
T: Future,
S: Schedule,
{
let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id));
let ptr = unsafe { NonNull::new_unchecked(ptr.cast()) };
let ptr = TaskBox::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id));

RawTask { ptr }
RawTask { ptr: ptr.cast() }
}

pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> RawTask {
Expand Down
124 changes: 124 additions & 0 deletions tokio/src/runtime/task/task_box.rs
@@ -0,0 +1,124 @@
//! Helper module for allocating and deallocating tasks.

use crate::runtime::task::core::{Cell, Core, Header, Trailer};
use crate::runtime::task::Schedule;

use std::alloc::{alloc, dealloc, handle_alloc_error, Layout};
use std::future::Future;
use std::marker::PhantomData;
use std::mem::{align_of, size_of, ManuallyDrop};
use std::ptr::{drop_in_place, NonNull};

fn layout_of<T: Future, S: Schedule>(scheduler: &S) -> Layout {
let size = std::mem::size_of::<Cell<T, S>>();
let mut align = std::mem::align_of::<Cell<T, S>>();
let min_align = scheduler.min_align();
if align < min_align {
align = min_align;
}
match Layout::from_size_align(size, align) {
Ok(layout) => layout,
Err(_) => panic!("Failed to build layout of type."),
}
}

/// A `Box<Cell<T, S>>` with an alignment of at least `s.min_align()`.
pub(super) struct TaskBox<T: Future, S: Schedule> {
ptr: NonNull<Cell<T, S>>,
_phantom: PhantomData<Cell<T, S>>,
}

impl<T: Future, S: Schedule> TaskBox<T, S> {
/// Creates a new task allocation.
pub(super) fn new(header: Header, core: Core<T, S>) -> Self {
let layout = layout_of::<T, S>(&core.scheduler);

assert_eq!(size_of::<Cell<T, S>>(), layout.size());
assert_ne!(size_of::<Cell<T, S>>(), 0);
assert!(align_of::<Cell<T, S>>() <= layout.align());

// SAFETY: The size of `layout` is non-zero as checked above.
let ptr = unsafe { alloc(layout) } as *mut Cell<T, S>;

let ptr = match NonNull::new(ptr) {
Some(ptr) => ptr,
None => handle_alloc_error(layout),
};

// SAFETY: We just allocated memory with the same size and a compatible
// alignment for `Cell<T, S>`.
unsafe {
ptr.as_ptr().write(Cell {
header,
core,
trailer: Trailer::new(),
});
};

Self {
ptr,
_phantom: PhantomData,
}
}

/// Convert this allocation into a raw pointer.
pub(super) fn into_raw(self) -> NonNull<Cell<T, S>> {
let me = ManuallyDrop::new(self);
me.ptr
}

/// Convert this allocation back into a `TaskBox`.
///
/// # Safety
///
/// The provided pointer must originate from a previous call to `into_raw`,
/// and the raw pointer must not be used again after this call.
pub(super) unsafe fn from_raw(ptr: NonNull<Cell<T, S>>) -> Self {
Self {
ptr,
_phantom: PhantomData,
}
}
}

impl<T: Future, S: Schedule> std::ops::Deref for TaskBox<T, S> {
type Target = Cell<T, S>;

fn deref(&self) -> &Cell<T, S> {
// SAFETY: This box always points at a valid cell.
unsafe { &*self.ptr.as_ptr() }
}
}

impl<T: Future, S: Schedule> Drop for TaskBox<T, S> {
fn drop(&mut self) {
let ptr = self.ptr.as_ptr();

// SAFETY: The task is still valid, so we can dereference the pointer.
let layout = layout_of::<T, S>(unsafe { &(*ptr).core.scheduler });

// SAFETY: The pointer was allocated with this layout. (The return value
// of `min_align` doesn't change.)
let _drop_helper = DropHelper {
layout,
ptr: ptr as *mut u8,
};

// SAFETY: A task box contains a pointer to a valid cell, and we have
// not dropped the allocation yet.
unsafe { drop_in_place(self.ptr.as_ptr()) };
}
}

struct DropHelper {
ptr: *mut u8,
layout: Layout,
}

impl Drop for DropHelper {
#[inline]
fn drop(&mut self) {
// SAFETY: See `TaskBox::drop`.
unsafe { dealloc(self.ptr, self.layout) };
}
}