diff --git a/tokio/src/loom/std/atomic_u32.rs b/tokio/src/loom/std/atomic_u32.rs index 61f95fb30ce..3f7cfefd3e0 100644 --- a/tokio/src/loom/std/atomic_u32.rs +++ b/tokio/src/loom/std/atomic_u32.rs @@ -15,6 +15,16 @@ impl AtomicU32 { let inner = UnsafeCell::new(std::sync::atomic::AtomicU32::new(val)); AtomicU32 { inner } } + + /// Performs an unsynchronized load. + /// + /// # Safety + /// + /// All mutations must have happened before the unsynchronized load. + /// Additionally, there must be no concurrent mutations. + pub(crate) unsafe fn unsync_load(&self) -> u32 { + *(*self.inner.get()).get_mut() + } } impl Deref for AtomicU32 { diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 2d53257428e..59b448d26b8 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -1,15 +1,30 @@ //! Run-queue structures to support a work-stealing scheduler use crate::loom::cell::UnsafeCell; -use crate::loom::sync::atomic::{AtomicU16, AtomicU32}; use crate::loom::sync::Arc; use crate::runtime::task::{self, Inject}; use crate::runtime::MetricsBatch; -use std::mem::MaybeUninit; +use std::mem::{self, MaybeUninit}; use std::ptr; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; +// Use wider integers when possible to increase ABA resilience. +// +// See issue #5041: . +cfg_has_atomic_u64! { + type UnsignedShort = u32; + type UnsignedLong = u64; + type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32; + type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64; +} +cfg_not_has_atomic_u64! { + type UnsignedShort = u16; + type UnsignedLong = u32; + type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16; + type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32; +} + /// Producer handle. May only be used from a single thread. pub(crate) struct Local { inner: Arc>, @@ -21,19 +36,21 @@ pub(crate) struct Steal(Arc>); pub(crate) struct Inner { /// Concurrently updated by many threads. /// - /// Contains two `u16` values. The LSB byte is the "real" head of the queue. - /// The `u16` in the MSB is set by a stealer in process of stealing values. - /// It represents the first value being stolen in the batch. `u16` is used - /// in order to distinguish between `head == tail` and `head == tail - - /// capacity`. + /// Contains two `UnsignedShort` values. The LSB byte is the "real" head of + /// the queue. The `UnsignedShort` in the MSB is set by a stealer in process + /// of stealing values. It represents the first value being stolen in the + /// batch. The `UnsignedShort` indices are intentionally wider than strictly + /// required for buffer indexing in order to provide ABA mitigation and make + /// it possible to distinguish between full and empty buffers. /// - /// When both `u16` values are the same, there is no active stealer. + /// When both `UnsignedShort` values are the same, there is no active + /// stealer. /// /// Tracking an in-progress stealer prevents a wrapping scenario. - head: AtomicU32, + head: AtomicUnsignedLong, /// Only updated by producer thread but read by many threads. - tail: AtomicU16, + tail: AtomicUnsignedShort, /// Elements buffer: Box<[UnsafeCell>>; LOCAL_QUEUE_CAPACITY]>, @@ -73,8 +90,8 @@ pub(crate) fn local() -> (Steal, Local) { } let inner = Arc::new(Inner { - head: AtomicU32::new(0), - tail: AtomicU16::new(0), + head: AtomicUnsignedLong::new(0), + tail: AtomicUnsignedShort::new(0), buffer: make_fixed_size(buffer.into_boxed_slice()), }); @@ -115,7 +132,7 @@ impl Local { // safety: this is the **only** thread that updates this cell. let tail = unsafe { self.inner.tail.unsync_load() }; - if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as u16 { + if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort { // There is capacity for the task break tail; } else if steal != real { @@ -165,8 +182,8 @@ impl Local { fn push_overflow( &mut self, task: task::Notified, - head: u16, - tail: u16, + head: UnsignedShort, + tail: UnsignedShort, inject: &Inject, metrics: &mut MetricsBatch, ) -> Result<(), task::Notified> { @@ -174,7 +191,7 @@ impl Local { /// /// This is one less than the number of tasks pushed to the inject /// queue as we are also inserting the `task` argument. - const NUM_TASKS_TAKEN: u16 = (LOCAL_QUEUE_CAPACITY / 2) as u16; + const NUM_TASKS_TAKEN: UnsignedShort = (LOCAL_QUEUE_CAPACITY / 2) as UnsignedShort; assert_eq!( tail.wrapping_sub(head) as usize, @@ -219,15 +236,15 @@ impl Local { /// An iterator that takes elements out of the run queue. struct BatchTaskIter<'a, T: 'static> { buffer: &'a [UnsafeCell>>; LOCAL_QUEUE_CAPACITY], - head: u32, - i: u32, + head: UnsignedLong, + i: UnsignedLong, } impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> { type Item = task::Notified; #[inline] fn next(&mut self) -> Option> { - if self.i == u32::from(NUM_TASKS_TAKEN) { + if self.i == UnsignedLong::from(NUM_TASKS_TAKEN) { None } else { let i_idx = self.i.wrapping_add(self.head) as usize & MASK; @@ -247,7 +264,7 @@ impl Local { // values again, and we are the only producer. let batch_iter = BatchTaskIter { buffer: &*self.inner.buffer, - head: head as u32, + head: head as UnsignedLong, i: 0, }; inject.push_batch(batch_iter.chain(std::iter::once(task))); @@ -320,7 +337,7 @@ impl Steal { // from `dst` there may not be enough capacity to steal. let (steal, _) = unpack(dst.inner.head.load(Acquire)); - if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as u16 / 2 { + if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as UnsignedShort / 2 { // we *could* try to steal less here, but for simplicity, we're just // going to abort. return None; @@ -335,7 +352,7 @@ impl Steal { return None; } - dst_metrics.incr_steal_count(n); + dst_metrics.incr_steal_count(n as u16); // We are returning a task here n -= 1; @@ -360,7 +377,7 @@ impl Steal { // Steal tasks from `self`, placing them into `dst`. Returns the number of // tasks that were stolen. - fn steal_into2(&self, dst: &mut Local, dst_tail: u16) -> u16 { + fn steal_into2(&self, dst: &mut Local, dst_tail: UnsignedShort) -> UnsignedShort { let mut prev_packed = self.0.head.load(Acquire); let mut next_packed; @@ -402,7 +419,11 @@ impl Steal { } }; - assert!(n <= LOCAL_QUEUE_CAPACITY as u16 / 2, "actual = {}", n); + assert!( + n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2, + "actual = {}", + n + ); let (first, _) = unpack(next_packed); @@ -479,7 +500,7 @@ impl Drop for Local { } impl Inner { - fn len(&self) -> u16 { + fn len(&self) -> UnsignedShort { let (_, head) = unpack(self.head.load(Acquire)); let tail = self.tail.load(Acquire); @@ -493,16 +514,16 @@ impl Inner { /// Split the head value into the real head and the index a stealer is working /// on. -fn unpack(n: u32) -> (u16, u16) { - let real = n & u16::MAX as u32; - let steal = n >> 16; +fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) { + let real = n & UnsignedShort::MAX as UnsignedLong; + let steal = n >> (mem::size_of::() * 8); - (steal as u16, real as u16) + (steal as UnsignedShort, real as UnsignedShort) } /// Join the two head values -fn pack(steal: u16, real: u16) -> u32 { - (real as u32) | ((steal as u32) << 16) +fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong { + (real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::() * 8)) } #[test]