diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 629e396c973..1bb8b52bb1d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -188,11 +188,13 @@ jobs: override: true - uses: Swatinem/rust-cache@v1 - name: miri - run: | - set -e - rm -rf tests - cargo miri test --features rt,rt-multi-thread,sync task + # Many of tests in tokio/tests and doctests use #[tokio::test] or + # #[tokio::main] that calls epoll_create1 that Miri does not support. + run: cargo miri test --features full --lib --no-fail-fast working-directory: tokio + env: + MIRIFLAGS: -Zmiri-disable-isolation -Zmiri-tag-raw-pointers + PROPTEST_CASES: 10 san: name: san diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 289e069a3cf..6979fb3e18d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -173,6 +173,12 @@ LOOM_MAX_PREEMPTIONS=1 RUSTFLAGS="--cfg loom" \ cargo test --lib --release --features full -- --test-threads=1 --nocapture ``` +You can run miri tests with +``` +MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-tag-raw-pointers" PROPTEST_CASES=10 \ + cargo +nightly miri test --features full --lib +``` + ### Tests If the change being proposed alters code (as opposed to only documentation for diff --git a/tokio/src/fs/file/tests.rs b/tokio/src/fs/file/tests.rs index 28b5ffe77af..18a4c078599 100644 --- a/tokio/src/fs/file/tests.rs +++ b/tokio/src/fs/file/tests.rs @@ -228,6 +228,7 @@ fn flush_while_idle() { } #[test] +#[cfg_attr(miri, ignore)] // takes a really long time with miri fn read_with_buffer_larger_than_max() { // Chunks let chunk_a = 16 * 1024; @@ -299,6 +300,7 @@ fn read_with_buffer_larger_than_max() { } #[test] +#[cfg_attr(miri, ignore)] // takes a really long time with miri fn write_with_buffer_larger_than_max() { // Chunks let chunk_a = 16 * 1024; diff --git a/tokio/src/process/unix/orphan.rs b/tokio/src/process/unix/orphan.rs index 1b0022c678e..0e52530c37b 100644 --- a/tokio/src/process/unix/orphan.rs +++ b/tokio/src/process/unix/orphan.rs @@ -280,6 +280,7 @@ pub(crate) mod test { drop(signal_guard); } + #[cfg_attr(miri, ignore)] // Miri does not support epoll. #[test] fn does_not_register_signal_if_queue_empty() { let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap(); diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index cb1750dee58..261dccea415 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -26,6 +26,10 @@ where } } + fn header_ptr(&self) -> NonNull
{ + self.cell.cast() + } + fn header(&self) -> &Header { unsafe { &self.cell.as_ref().header } } @@ -93,7 +97,8 @@ where match self.header().state.transition_to_running() { TransitionToRunning::Success => { - let waker_ref = waker_ref::(self.header()); + let header_ptr = self.header_ptr(); + let waker_ref = waker_ref::(&header_ptr); let cx = Context::from_waker(&*waker_ref); let res = poll_future(&self.core().stage, cx); diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 89265a41b3c..15e36a0d897 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -313,7 +313,7 @@ cfg_rt_multi_thread! { impl Task { fn into_raw(self) -> NonNull
{ - let ret = self.header().into(); + let ret = self.raw.header_ptr(); mem::forget(self); ret } @@ -427,7 +427,7 @@ unsafe impl linked_list::Link for Task { type Target = Header; fn as_raw(handle: &Task) -> NonNull
{ - handle.header().into() + handle.raw.header_ptr() } unsafe fn from_raw(ptr: NonNull
) -> Task { diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index 30fe4156300..2e4420b5c13 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -63,6 +63,10 @@ impl RawTask { RawTask { ptr } } + pub(super) fn header_ptr(&self) -> NonNull
{ + self.ptr + } + /// Returns a reference to the task's meta structure. /// /// Safe as `Header` is `Sync`. diff --git a/tokio/src/runtime/task/waker.rs b/tokio/src/runtime/task/waker.rs index b7313b4c590..74a29f4a847 100644 --- a/tokio/src/runtime/task/waker.rs +++ b/tokio/src/runtime/task/waker.rs @@ -15,7 +15,7 @@ pub(super) struct WakerRef<'a, S: 'static> { /// Returns a `WakerRef` which avoids having to pre-emptively increase the /// refcount if there is no need to do so. -pub(super) fn waker_ref(header: &Header) -> WakerRef<'_, S> +pub(super) fn waker_ref(header: &NonNull
) -> WakerRef<'_, S> where T: Future, S: Schedule, @@ -28,7 +28,7 @@ where // point and not an *owned* waker, we must ensure that `drop` is never // called on this waker instance. This is done by wrapping it with // `ManuallyDrop` and then never calling drop. - let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::(header))) }; + let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::(*header))) }; WakerRef { waker, @@ -77,7 +77,7 @@ where let harness = Harness::::from_raw(ptr); trace!(harness, "waker.clone"); (*header).state.ref_inc(); - raw_waker::(header) + raw_waker::(ptr) } unsafe fn drop_waker(ptr: *const ()) @@ -114,12 +114,12 @@ where harness.wake_by_ref(); } -fn raw_waker(header: *const Header) -> RawWaker +fn raw_waker(header: NonNull
) -> RawWaker where T: Future, S: Schedule, { - let ptr = header as *const (); + let ptr = header.as_ptr() as *const (); let vtable = &RawWakerVTable::new( clone_waker::, wake_by_val::, diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 2de17b9f4c4..0fd1e0c6d9e 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -101,13 +101,21 @@ fn steal_batch() { assert!(local1.pop().is_none()); } +const fn normal_or_miri(normal: usize, miri: usize) -> usize { + if cfg!(miri) { + miri + } else { + normal + } +} + #[test] fn stress1() { const NUM_ITER: usize = 1; - const NUM_STEAL: usize = 1_000; - const NUM_LOCAL: usize = 1_000; - const NUM_PUSH: usize = 500; - const NUM_POP: usize = 250; + const NUM_STEAL: usize = normal_or_miri(1_000, 10); + const NUM_LOCAL: usize = normal_or_miri(1_000, 10); + const NUM_PUSH: usize = normal_or_miri(500, 10); + const NUM_POP: usize = normal_or_miri(250, 10); let mut metrics = MetricsBatch::new(); @@ -169,8 +177,8 @@ fn stress1() { #[test] fn stress2() { const NUM_ITER: usize = 1; - const NUM_TASKS: usize = 1_000_000; - const NUM_STEAL: usize = 1_000; + const NUM_TASKS: usize = normal_or_miri(1_000_000, 50); + const NUM_STEAL: usize = normal_or_miri(1_000, 10); let mut metrics = MetricsBatch::new(); diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index e0a2df9208e..6d8eb9e7487 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -202,7 +202,12 @@ mod tests { registry.broadcast(); // Yield so the previous broadcast can get received - crate::time::sleep(std::time::Duration::from_millis(10)).await; + // + // This yields many times since the block_on task is only polled every 61 + // ticks. + for _ in 0..100 { + crate::task::yield_now().await; + } // Send subsequent signal registry.record_event(0); diff --git a/tokio/src/signal/reusable_box.rs b/tokio/src/signal/reusable_box.rs index 796fa210b03..02f32474b16 100644 --- a/tokio/src/signal/reusable_box.rs +++ b/tokio/src/signal/reusable_box.rs @@ -151,6 +151,7 @@ impl fmt::Debug for ReusableBoxFuture { } #[cfg(test)] +#[cfg(not(miri))] // Miri breaks when you use Pin<&mut dyn Future> mod test { use super::ReusableBoxFuture; use futures::future::FutureExt; diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index c93ce3bd45a..83d0de4fbe6 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -130,6 +130,7 @@ enum NotificationType { } #[derive(Debug)] +#[repr(C)] // required by `linked_list::Link` impl struct Waiter { /// Intrusive linked-list pointers. pointers: linked_list::Pointers, @@ -731,8 +732,8 @@ unsafe impl linked_list::Link for Waiter { ptr } - unsafe fn pointers(mut target: NonNull) -> NonNull> { - NonNull::from(&mut target.as_mut().pointers) + unsafe fn pointers(target: NonNull) -> NonNull> { + target.cast() } } diff --git a/tokio/src/sync/tests/notify.rs b/tokio/src/sync/tests/notify.rs index 2828b1c342a..20153b7a5a8 100644 --- a/tokio/src/sync/tests/notify.rs +++ b/tokio/src/sync/tests/notify.rs @@ -45,3 +45,37 @@ fn notify_clones_waker_before_lock() { // The result doesn't matter, we're just testing that we don't deadlock. let _ = future.poll(&mut cx); } + +#[test] +fn notify_simple() { + let notify = Notify::new(); + + let mut fut1 = tokio_test::task::spawn(notify.notified()); + assert!(fut1.poll().is_pending()); + + let mut fut2 = tokio_test::task::spawn(notify.notified()); + assert!(fut2.poll().is_pending()); + + notify.notify_waiters(); + + assert!(fut1.poll().is_ready()); + assert!(fut2.poll().is_ready()); +} + +#[test] +#[cfg(not(target_arch = "wasm32"))] +fn watch_test() { + let rt = crate::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + rt.block_on(async { + let (tx, mut rx) = crate::sync::watch::channel(()); + + crate::spawn(async move { + let _ = tx.send(()); + }); + + let _ = rx.changed().await; + }); +} diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index 1beee57604b..f0ea898e120 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -326,15 +326,16 @@ pub(super) type EntryList = crate::util::linked_list::LinkedList, + /// Current state. This records whether the timer entry is currently under /// the ownership of the driver, and if not, its current state (not /// complete, fired, error, etc). state: StateCell, - /// Data manipulated by the driver thread itself, only. - driver_state: CachePadded, - _p: PhantomPinned, } @@ -420,7 +421,14 @@ impl TimerShared { /// padded. This contains the information that the driver thread accesses most /// frequently to minimize contention. In particular, we move it away from the /// waker, as the waker is updated on every poll. +#[repr(C)] // required by `link_list::Link` impl struct TimerSharedPadded { + /// A link within the doubly-linked list of timers on a particular level and + /// slot. Valid only if state is equal to Registered. + /// + /// Only accessed under the entry lock. + pointers: linked_list::Pointers, + /// The expiration time for which this entry is currently registered. /// Generally owned by the driver, but is accessed by the entry when not /// registered. @@ -428,12 +436,6 @@ struct TimerSharedPadded { /// The true expiration time. Set by the timer future, read by the driver. true_when: AtomicU64, - - /// A link within the doubly-linked list of timers on a particular level and - /// slot. Valid only if state is equal to Registered. - /// - /// Only accessed under the entry lock. - pointers: StdUnsafeCell>, } impl std::fmt::Debug for TimerSharedPadded { @@ -450,7 +452,7 @@ impl TimerSharedPadded { Self { cached_when: AtomicU64::new(0), true_when: AtomicU64::new(0), - pointers: StdUnsafeCell::new(linked_list::Pointers::new()), + pointers: linked_list::Pointers::new(), } } } @@ -474,7 +476,7 @@ unsafe impl linked_list::Link for TimerShared { unsafe fn pointers( target: NonNull, ) -> NonNull> { - unsafe { NonNull::new(target.as_ref().driver_state.0.pointers.get()).unwrap() } + target.cast() } } diff --git a/tokio/src/time/driver/tests/mod.rs b/tokio/src/time/driver/tests/mod.rs index 7c5cf1fd05c..3ac8c756437 100644 --- a/tokio/src/time/driver/tests/mod.rs +++ b/tokio/src/time/driver/tests/mod.rs @@ -27,7 +27,12 @@ fn block_on(f: impl std::future::Future) -> T { return loom::future::block_on(f); #[cfg(not(loom))] - return futures::executor::block_on(f); + { + let rt = crate::runtime::Builder::new_current_thread() + .build() + .unwrap(); + rt.block_on(f) + } } fn model(f: impl Fn() + Send + Sync + 'static) { @@ -182,6 +187,15 @@ fn reset_future() { }) } +#[cfg(not(loom))] +fn normal_or_miri(normal: T, miri: T) -> T { + if cfg!(miri) { + miri + } else { + normal + } +} + #[test] #[cfg(not(loom))] fn poll_process_levels() { @@ -195,7 +209,7 @@ fn poll_process_levels() { let mut entries = vec![]; - for i in 0..1024 { + for i in 0..normal_or_miri(1024, 64) { let mut entry = Box::pin(TimerEntry::new( &handle, clock.now() + Duration::from_millis(i), @@ -208,7 +222,7 @@ fn poll_process_levels() { entries.push(entry); } - for t in 1..1024 { + for t in 1..normal_or_miri(1024, 64) { handle.process_at_time(t as u64); for (deadline, future) in entries.iter_mut().enumerate() { let mut context = Context::from_waker(noop_waker_ref()); diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index cd74d376ae3..e6bdde68c7a 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -57,6 +57,13 @@ pub(crate) unsafe trait Link { unsafe fn from_raw(ptr: NonNull) -> Self::Handle; /// Return the pointers for a node + /// + /// # Safety + /// + /// The resulting pointer should have the same tag in the stacked-borrows + /// stack as the argument. In particular, the method may not create an + /// intermediate reference in the process of creating the resulting raw + /// pointer. unsafe fn pointers(target: NonNull) -> NonNull>; } @@ -353,6 +360,7 @@ mod tests { use std::pin::Pin; #[derive(Debug)] + #[repr(C)] struct Entry { pointers: Pointers, val: i32, @@ -370,8 +378,8 @@ mod tests { Pin::new_unchecked(&*ptr.as_ptr()) } - unsafe fn pointers(mut target: NonNull) -> NonNull> { - NonNull::from(&mut target.as_mut().pointers) + unsafe fn pointers(target: NonNull) -> NonNull> { + target.cast() } } diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index 97355d500fc..214fa08dc89 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -157,6 +157,12 @@ struct Slot { /// Next entry in the free list. next: u32, + + /// Makes miri happy by making mutable references not take exclusive access. + /// + /// Could probably also be fixed by replacing `slots` with a raw-pointer + /// based equivalent. + _pin: std::marker::PhantomPinned, } /// Value paired with a reference to the page. @@ -409,7 +415,7 @@ impl Page { slot.value.with(|ptr| unsafe { (*ptr).value.reset() }); // Return a reference to the slot - Some((me.addr(idx), slot.gen_ref(me))) + Some((me.addr(idx), locked.gen_ref(idx, me))) } else if me.len == locked.slots.len() { // The page is full None @@ -428,9 +434,10 @@ impl Page { locked.slots.push(Slot { value: UnsafeCell::new(Value { value: Default::default(), - page: &**me as *const _, + page: Arc::as_ptr(me), }), next: 0, + _pin: std::marker::PhantomPinned, }); // Increment the head to indicate the free stack is empty @@ -443,7 +450,7 @@ impl Page { debug_assert_eq!(locked.slots.len(), locked.head); - Some((me.addr(idx), locked.slots[idx].gen_ref(me))) + Some((me.addr(idx), locked.gen_ref(idx, me))) } } } @@ -558,18 +565,15 @@ impl Slots { idx } -} -impl Slot { - /// Generates a `Ref` for the slot. This involves bumping the page's ref count. - fn gen_ref(&self, page: &Arc>) -> Ref { - // The ref holds a ref on the page. The `Arc` is forgotten here and is - // resurrected in `release` when the `Ref` is dropped. By avoiding to - // hold on to an explicit `Arc` value, the struct size of `Ref` is - // reduced. + /// Generates a `Ref` for the slot at the given index. This involves bumping the page's ref count. + fn gen_ref(&self, idx: usize, page: &Arc>) -> Ref { + assert!(idx < self.slots.len()); mem::forget(page.clone()); - let slot = self as *const Slot; - let value = slot as *const Value; + + let vec_ptr = self.slots.as_ptr(); + let slot: *const Slot = unsafe { vec_ptr.add(idx) }; + let value: *const Value = slot as *const Value; Ref { value } } @@ -691,11 +695,13 @@ mod test { #[test] fn insert_many() { + const MANY: usize = normal_or_miri(10_000, 50); + let mut slab = Slab::::new(); let alloc = slab.allocator(); let mut entries = vec![]; - for i in 0..10_000 { + for i in 0..MANY { let (addr, val) = alloc.allocate().unwrap(); val.id.store(i, SeqCst); entries.push((addr, val)); @@ -708,15 +714,15 @@ mod test { entries.clear(); - for i in 0..10_000 { + for i in 0..MANY { let (addr, val) = alloc.allocate().unwrap(); - val.id.store(10_000 - i, SeqCst); + val.id.store(MANY - i, SeqCst); entries.push((addr, val)); } for (i, (addr, v)) in entries.iter().enumerate() { - assert_eq!(10_000 - i, v.id.load(SeqCst)); - assert_eq!(10_000 - i, slab.get(*addr).unwrap().id.load(SeqCst)); + assert_eq!(MANY - i, v.id.load(SeqCst)); + assert_eq!(MANY - i, slab.get(*addr).unwrap().id.load(SeqCst)); } } @@ -726,7 +732,7 @@ mod test { let alloc = slab.allocator(); let mut entries = vec![]; - for i in 0..10_000 { + for i in 0..normal_or_miri(10_000, 100) { let (addr, val) = alloc.allocate().unwrap(); val.id.store(i, SeqCst); entries.push((addr, val)); @@ -734,7 +740,7 @@ mod test { for _ in 0..10 { // Drop 1000 in reverse - for _ in 0..1_000 { + for _ in 0..normal_or_miri(1_000, 10) { entries.pop(); } @@ -753,7 +759,7 @@ mod test { let mut entries1 = vec![]; let mut entries2 = vec![]; - for i in 0..10_000 { + for i in 0..normal_or_miri(10_000, 100) { let (addr, val) = alloc.allocate().unwrap(); val.id.store(i, SeqCst); @@ -771,6 +777,14 @@ mod test { } } + const fn normal_or_miri(normal: usize, miri: usize) -> usize { + if cfg!(miri) { + miri + } else { + normal + } + } + #[test] fn compact_all() { let mut slab = Slab::::new(); @@ -780,7 +794,7 @@ mod test { for _ in 0..2 { entries.clear(); - for i in 0..10_000 { + for i in 0..normal_or_miri(10_000, 100) { let (addr, val) = alloc.allocate().unwrap(); val.id.store(i, SeqCst); @@ -808,7 +822,7 @@ mod test { let alloc = slab.allocator(); let mut entries = vec![]; - for _ in 0..5 { + for _ in 0..normal_or_miri(5, 2) { entries.clear(); // Allocate a few pages + 1 diff --git a/tokio/src/util/wake.rs b/tokio/src/util/wake.rs index bf1d4bdf92a..5526cbc63ae 100644 --- a/tokio/src/util/wake.rs +++ b/tokio/src/util/wake.rs @@ -31,7 +31,7 @@ impl Deref for WakerRef<'_> { /// Creates a reference to a `Waker` from a reference to `Arc`. pub(crate) fn waker_ref(wake: &Arc) -> WakerRef<'_> { - let ptr = &**wake as *const _ as *const (); + let ptr = Arc::as_ptr(wake) as *const (); let waker = unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::())) };