From 9983f4a4e7c6ea7cf91fd6f8953c62d0849ea0af Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 11 Oct 2022 14:17:08 -0700 Subject: [PATCH 01/22] (messy) test that tasks are woken to local queue --- tokio/src/task/local.rs | 59 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 513671d097f..5cb6dd241b5 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -939,4 +939,63 @@ mod tests { .expect("rt") .block_on(f) } + + // Tests that when a task on a `LocalSet` is woken by an io driver on the + // same thread, the task is woken to the localset's local queue rather than + // its remote queue. + // + // This test has to be defined in the `local.rs` file as a lib test, rather + // than in `tests/`, because it makes assertions about the local set's + // internal state. + #[test] + #[cfg(feature = "net")] + fn io_wakes_to_local_queue() { + use super::*; + use crate::net::{TcpListener, TcpStream}; + let rt = crate::runtime::Builder::new_current_thread() + .enable_io() + .build() + .expect("rt"); + rt.block_on(async { + let local = LocalSet::new(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener + .local_addr() + .expect("listener should have an address"); + let mut run_until = Box::pin(local.run_until(async move { + spawn_local(async move { + println!("listening"); + + let _ = listener.accept().await; + }) + .await + .unwrap(); + })); + + // poll the run until future once + crate::future::poll_fn(|cx| { + let _ = run_until.as_mut().poll(cx); + Poll::Ready(()) + }) + .await; + + let _sock = TcpStream::connect(addr).await.unwrap(); + let task = local.context.queue.pop_front(); + assert_eq!( + local + .context + .shared + .queue + .lock() + .as_ref() + .map(|q| q.is_empty()), + Some(true), + "the task should *not* have been notified to the local set's remote queue" + ); + assert!( + task.is_some(), + "task should have been notified to the LocalSet's local queue" + ); + }) + } } From a0b7574c342b544044861ab6dcec503cbbcc293e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 11 Oct 2022 14:41:50 -0700 Subject: [PATCH 02/22] wake tasks locally if woken by the same thread --- tokio/src/task/local.rs | 109 +++++++++++++++++++++++++++++----------- 1 file changed, 80 insertions(+), 29 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 5cb6dd241b5..463f1f2a5b2 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -12,6 +12,7 @@ use std::marker::PhantomData; use std::pin::Pin; use std::rc::Rc; use std::task::Poll; +use std::thread::{self, ThreadId}; use pin_project_lite::pin_project; @@ -228,9 +229,6 @@ struct Context { /// Collection of all active tasks spawned onto this executor. owned: LocalOwnedTasks>, - /// Local run queue sender and receiver. - queue: VecDequeCell>>, - /// State shared between threads. shared: Arc, @@ -241,6 +239,19 @@ struct Context { /// LocalSet state shared between threads. struct Shared { + /// Local run queue sender and receiver. + /// + /// # Safety + /// + /// This field must *only* be accessed from the thread that owns the + /// `LocalSet` (i.e., `Thread::current().id() == owner`). + local_queue: VecDequeCell>>, + + /// The `ThreadId` of the thread that owns the `LocalSet`. + /// + /// Since `LocalSet` is `!Send`, this will never change. + owner: ThreadId, + /// Remote run queue sender. queue: Mutex>>>>, @@ -354,8 +365,9 @@ impl LocalSet { tick: Cell::new(0), context: Rc::new(Context { owned: LocalOwnedTasks::new(), - queue: VecDequeCell::with_capacity(INITIAL_CAPACITY), shared: Arc::new(Shared { + local_queue: VecDequeCell::with_capacity(INITIAL_CAPACITY), + owner: thread::current().id(), queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), waker: AtomicWaker::new(), #[cfg(tokio_unstable)] @@ -597,9 +609,9 @@ impl LocalSet { .lock() .as_mut() .and_then(|queue| queue.pop_front()) - .or_else(|| self.context.queue.pop_front()) + .or_else(|| self.pop_local()) } else { - self.context.queue.pop_front().or_else(|| { + self.pop_local().or_else(|| { self.context .shared .queue @@ -612,6 +624,15 @@ impl LocalSet { task.map(|task| self.context.owned.assert_owner(task)) } + fn pop_local(&self) -> Option>> { + unsafe { + // Safety: because the `LocalSet` itself is `!Send`, we know we are + // on the same thread if we have access to the `LocalSet`, and can + // therefore access the local run queue. + self.context.shared.local_queue().pop_front() + } + } + fn with(&self, f: impl FnOnce() -> T) -> T { CURRENT.with(|ctx| { struct Reset<'a> { @@ -782,7 +803,14 @@ impl Drop for LocalSet { // We already called shutdown on all tasks above, so there is no // need to call shutdown. - for task in self.context.queue.take() { + let local_queue = unsafe { + // Safety: because `LocalSet` is `!Send`, if we are dropping a + // `LocalSet`, we know we are on the same thread it was created + // on, so we can access its local queue. + self.context.shared.local_queue().take() + }; + + for task in local_queue { drop(task); } @@ -854,15 +882,46 @@ impl Future for RunUntil<'_, T> { } impl Shared { + /// # Safety + /// + /// This is safe to call if and ONLY if we are on the thread that owns this + /// `LocalSet`. + unsafe fn local_queue(&self) -> &VecDequeCell>> { + debug_assert_eq!( + thread::current().id(), + self.owner, + "`LocalSet`'s local run queue must not be accessed by another thread!" + ); + &self.local_queue + } + /// Schedule the provided task on the scheduler. fn schedule(&self, task: task::Notified>) { CURRENT.with(|maybe_cx| { match maybe_cx.get() { - Some(cx) if cx.shared.ptr_eq(self) => { - cx.queue.push_back(task); + Some(cx) if cx.shared.ptr_eq(self) => unsafe { + // Safety: if the current `LocalSet` context points to this + // `LocalSet`, then we are on the thread that owns it. + cx.shared.local_queue().push_back(task); + }, + + // We are on the thread that owns the `LocalSet`, so we can + // wake to the local queue. + _ if thread::current().id() == self.owner => { + unsafe { + // Safety: we just checked that the thread ID matches + // the localset's owner, so this is safe. + self.local_queue().push_back(task); + } + // We still have to wake the `LocalSet`, because it isn't + // currently being polled. + self.waker.wake(); } + + // We are *not* on the thread that owns the `LocalSet`, so we + // have to wake to the remote queue. _ => { - // First check whether the queue is still there (if not, the + // First, check whether the queue is still there (if not, the // LocalSet is dropped). Then push to it if so, and if not, // do nothing. let mut lock = self.queue.lock(); @@ -882,6 +941,10 @@ impl Shared { } } +// This is safe because (and only because) we *pinky pwomise* to never touch the +// local run queue except from the thread that owns the `LocalSet`. +unsafe impl Sync for Shared {} + impl task::Schedule for Arc { fn release(&self, task: &Task) -> Option> { CURRENT.with(|maybe_cx| match maybe_cx.get() { @@ -962,14 +1025,11 @@ mod tests { let addr = listener .local_addr() .expect("listener should have an address"); + let task = local.spawn_local(async move { + let _ = listener.accept().await; + }); let mut run_until = Box::pin(local.run_until(async move { - spawn_local(async move { - println!("listening"); - - let _ = listener.accept().await; - }) - .await - .unwrap(); + task.await.unwrap(); })); // poll the run until future once @@ -980,18 +1040,9 @@ mod tests { .await; let _sock = TcpStream::connect(addr).await.unwrap(); - let task = local.context.queue.pop_front(); - assert_eq!( - local - .context - .shared - .queue - .lock() - .as_ref() - .map(|q| q.is_empty()), - Some(true), - "the task should *not* have been notified to the local set's remote queue" - ); + let task = unsafe { local.context.shared.local_queue().pop_front() }; + // TODO(eliza): it would be nice to be able to assert that this is + // the local task. assert!( task.is_some(), "task should have been notified to the LocalSet's local queue" From 51b4a9fa6c1bddc2233f0b26d056e2919828cfcc Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 11 Oct 2022 16:28:57 -0700 Subject: [PATCH 03/22] avoid an Arc clone every time we get a thread id this also handles the annoying issue that the thread id's value is not guaranteed to be correct while dropping a thread-local... --- tokio/src/runtime/task/join.rs | 7 ++++ tokio/src/task/local.rs | 65 +++++++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 16 deletions(-) diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index 927be1acf3e..df00fb1e83d 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -282,6 +282,13 @@ impl JoinHandle { pub fn id(&self) -> super::Id { self.id.clone() } + + /// if `tokio_unstable` is disabled, we still use task IDs for internal + /// testing purposes, so this is `pub(crate)`. + #[cfg(all(test, not(tokio_unstable)))] + pub(crate) fn id(&self) -> super::Id { + self.id.clone() + } } impl Unpin for JoinHandle {} diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 463f1f2a5b2..77a3d7bc167 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -273,10 +273,21 @@ pin_project! { } #[cfg(any(loom, tokio_no_const_thread_local))] -thread_local!(static CURRENT: RcCell = RcCell::new()); +thread_local!(static CURRENT: RcCell = LocalData { + thread_id: Cell::new(None), + ctx: RcCell::new(), +}); #[cfg(not(any(loom, tokio_no_const_thread_local)))] -thread_local!(static CURRENT: RcCell = const { RcCell::new() }); +thread_local!(static CURRENT: LocalData = const { LocalData { + thread_id: Cell::new(None), + ctx: RcCell::new(), +} }); + +struct LocalData { + thread_id: Cell>, + ctx: RcCell, +} cfg_rt! { /// Spawns a `!Send` future on the local task set. @@ -325,7 +336,7 @@ cfg_rt! { where F: Future + 'static, F::Output: 'static { - match CURRENT.with(|maybe_cx| maybe_cx.get()) { + match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) { None => panic!("`spawn_local` called from outside of a `task::LocalSet`"), Some(cx) => cx.spawn(future, name) } @@ -346,7 +357,7 @@ pub struct LocalEnterGuard(Option>); impl Drop for LocalEnterGuard { fn drop(&mut self) { - CURRENT.with(|ctx| { + CURRENT.with(|LocalData { ctx, .. }| { ctx.set(self.0.take()); }) } @@ -386,7 +397,7 @@ impl LocalSet { /// /// [`spawn_local`]: fn@crate::task::spawn_local pub fn enter(&self) -> LocalEnterGuard { - CURRENT.with(|ctx| { + CURRENT.with(|LocalData { ctx, .. }| { let old = ctx.replace(Some(self.context.clone())); LocalEnterGuard(old) }) @@ -634,7 +645,7 @@ impl LocalSet { } fn with(&self, f: impl FnOnce() -> T) -> T { - CURRENT.with(|ctx| { + CURRENT.with(|LocalData { ctx, .. }| { struct Reset<'a> { ctx_ref: &'a RcCell, val: Option>, @@ -647,7 +658,7 @@ impl LocalSet { let old = ctx.replace(Some(self.context.clone())); let _reset = Reset { - ctx_ref: ctx, + ctx_ref: &ctx, val: old, }; @@ -660,7 +671,7 @@ impl LocalSet { fn with_if_possible(&self, f: impl FnOnce() -> T) -> T { let mut f = Some(f); - let res = CURRENT.try_with(|ctx| { + let res = CURRENT.try_with(|LocalData { ctx, .. }| { struct Reset<'a> { ctx_ref: &'a RcCell, val: Option>, @@ -673,7 +684,7 @@ impl LocalSet { let old = ctx.replace(Some(self.context.clone())); let _reset = Reset { - ctx_ref: ctx, + ctx_ref: &ctx, val: old, }; @@ -887,9 +898,11 @@ impl Shared { /// This is safe to call if and ONLY if we are on the thread that owns this /// `LocalSet`. unsafe fn local_queue(&self) -> &VecDequeCell>> { - debug_assert_eq!( - thread::current().id(), - self.owner, + debug_assert!( + // if we couldn't get the thread ID because we're dropping the local + // data, skip the assertion --- the `Drop` impl is not going to be + // called from another thread, because `LocalSet` is `!Send` + thread_id().map(|id| id == self.owner).unwrap_or(true), "`LocalSet`'s local run queue must not be accessed by another thread!" ); &self.local_queue @@ -897,8 +910,8 @@ impl Shared { /// Schedule the provided task on the scheduler. fn schedule(&self, task: task::Notified>) { - CURRENT.with(|maybe_cx| { - match maybe_cx.get() { + CURRENT.with(|localdata| { + match localdata.ctx.get() { Some(cx) if cx.shared.ptr_eq(self) => unsafe { // Safety: if the current `LocalSet` context points to this // `LocalSet`, then we are on the thread that owns it. @@ -907,7 +920,7 @@ impl Shared { // We are on the thread that owns the `LocalSet`, so we can // wake to the local queue. - _ if thread::current().id() == self.owner => { + _ if localdata.get_or_insert_id() == self.owner => { unsafe { // Safety: we just checked that the thread ID matches // the localset's owner, so this is safe. @@ -947,7 +960,7 @@ unsafe impl Sync for Shared {} impl task::Schedule for Arc { fn release(&self, task: &Task) -> Option> { - CURRENT.with(|maybe_cx| match maybe_cx.get() { + CURRENT.with(|LocalData { ctx, .. }| match ctx.get() { None => panic!("scheduler context missing"), Some(cx) => { assert!(cx.shared.ptr_eq(self)); @@ -985,6 +998,26 @@ impl task::Schedule for Arc { } } +impl LocalData { + fn get_or_insert_id(&self) -> ThreadId { + match self.thread_id.get() { + Some(id) => id, + None => { + let id = thread::current().id(); + self.thread_id.set(Some(id)); + id + } + } + } +} + +#[cfg(debug_assertions)] +fn thread_id() -> Option { + CURRENT + .try_with(|localdata| localdata.get_or_insert_id()) + .ok() +} + #[cfg(test)] mod tests { use super::*; From 26703d690eca6211a01cfbf3929a147717f8557d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 11 Oct 2022 17:09:50 -0700 Subject: [PATCH 04/22] whoops, fix unstable --- tokio/src/task/local.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 77a3d7bc167..a2968e6b608 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -985,7 +985,7 @@ impl task::Schedule for Arc { // This hook is only called from within the runtime, so // `CURRENT` should match with `&self`, i.e. there is no // opportunity for a nested scheduler to be called. - CURRENT.with(|maybe_cx| match maybe_cx.get() { + CURRENT.with(|LocalData { ctx, .. }| match maybe_cx.get() { Some(cx) if Arc::ptr_eq(self, &cx.shared) => { cx.unhandled_panic.set(true); cx.owned.close_and_shutdown_all(); From eadd363b297a96457dc0f1db0a5fc96bcc2203c6 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 12 Oct 2022 09:52:27 -0700 Subject: [PATCH 05/22] debug_assert still compiles code --- tokio/src/task/local.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index a2968e6b608..21814ad49fe 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -1011,7 +1011,6 @@ impl LocalData { } } -#[cfg(debug_assertions)] fn thread_id() -> Option { CURRENT .try_with(|localdata| localdata.get_or_insert_id()) From b9df4d1d95c7d0e6f00bd0343e8dd00a48942d86 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 12 Oct 2022 10:14:58 -0700 Subject: [PATCH 06/22] see if CI passes --- tokio/src/macros/thread_local.rs | 6 +++--- tokio/src/task/local.rs | 14 ++++---------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/tokio/src/macros/thread_local.rs b/tokio/src/macros/thread_local.rs index 13a4f467f50..6182dbb8e34 100644 --- a/tokio/src/macros/thread_local.rs +++ b/tokio/src/macros/thread_local.rs @@ -1,5 +1,5 @@ #[cfg(all(loom, test))] -macro_rules! thread_local { +macro_rules! tl { ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty = const { $expr:expr } $(;)?) => { loom::thread_local! { $(#[$attrs])* @@ -12,13 +12,13 @@ macro_rules! thread_local { #[cfg(not(tokio_no_const_thread_local))] #[cfg(not(all(loom, test)))] -macro_rules! thread_local { +macro_rules! tl { ($($tts:tt)+) => { ::std::thread_local!{ $($tts)+ } } } #[cfg(tokio_no_const_thread_local)] #[cfg(not(all(loom, test)))] -macro_rules! thread_local { +macro_rules! tl { ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty = const { $expr:expr } $(;)?) => { ::std::thread_local! { $(#[$attrs])* diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 21814ad49fe..0c605a08f2a 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -272,14 +272,8 @@ pin_project! { } } -#[cfg(any(loom, tokio_no_const_thread_local))] -thread_local!(static CURRENT: RcCell = LocalData { - thread_id: Cell::new(None), - ctx: RcCell::new(), -}); - -#[cfg(not(any(loom, tokio_no_const_thread_local)))] -thread_local!(static CURRENT: LocalData = const { LocalData { +// #[cfg(not(any(loom, tokio_no_const_thread_local)))] +tl!(static CURRENT: LocalData = { LocalData { thread_id: Cell::new(None), ctx: RcCell::new(), } }); @@ -985,7 +979,7 @@ impl task::Schedule for Arc { // This hook is only called from within the runtime, so // `CURRENT` should match with `&self`, i.e. there is no // opportunity for a nested scheduler to be called. - CURRENT.with(|LocalData { ctx, .. }| match maybe_cx.get() { + CURRENT.with(|LocalData { ctx, .. }| match ctx.get() { Some(cx) if Arc::ptr_eq(self, &cx.shared) => { cx.unhandled_panic.set(true); cx.owned.close_and_shutdown_all(); @@ -1017,7 +1011,7 @@ fn thread_id() -> Option { .ok() } -#[cfg(test)] +#[cfg(all(test, not(loom)))] mod tests { use super::*; #[test] From 453deb63afab9de728e3fb49d2ca8453f9a8dde3 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 12 Oct 2022 10:20:49 -0700 Subject: [PATCH 07/22] try again --- tokio/src/macros/thread_local.rs | 6 +++--- tokio/src/task/local.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/macros/thread_local.rs b/tokio/src/macros/thread_local.rs index 6182dbb8e34..13a4f467f50 100644 --- a/tokio/src/macros/thread_local.rs +++ b/tokio/src/macros/thread_local.rs @@ -1,5 +1,5 @@ #[cfg(all(loom, test))] -macro_rules! tl { +macro_rules! thread_local { ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty = const { $expr:expr } $(;)?) => { loom::thread_local! { $(#[$attrs])* @@ -12,13 +12,13 @@ macro_rules! tl { #[cfg(not(tokio_no_const_thread_local))] #[cfg(not(all(loom, test)))] -macro_rules! tl { +macro_rules! thread_local { ($($tts:tt)+) => { ::std::thread_local!{ $($tts)+ } } } #[cfg(tokio_no_const_thread_local)] #[cfg(not(all(loom, test)))] -macro_rules! tl { +macro_rules! thread_local { ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty = const { $expr:expr } $(;)?) => { ::std::thread_local! { $(#[$attrs])* diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 0c605a08f2a..7453dd8d3a2 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -273,7 +273,7 @@ pin_project! { } // #[cfg(not(any(loom, tokio_no_const_thread_local)))] -tl!(static CURRENT: LocalData = { LocalData { +thread_local!(static CURRENT: LocalData = { LocalData { thread_id: Cell::new(None), ctx: RcCell::new(), } }); From 1dc9d4d3760935bbbcdf9652474b8162a26f98bf Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 12 Oct 2022 10:24:24 -0700 Subject: [PATCH 08/22] fix clippy --- tokio/src/task/local.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 7453dd8d3a2..8ad70431b6e 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -652,7 +652,7 @@ impl LocalSet { let old = ctx.replace(Some(self.context.clone())); let _reset = Reset { - ctx_ref: &ctx, + ctx_ref: ctx, val: old, }; @@ -678,7 +678,7 @@ impl LocalSet { let old = ctx.replace(Some(self.context.clone())); let _reset = Reset { - ctx_ref: &ctx, + ctx_ref: ctx, val: old, }; From a2cf00f25c164f524a7e7757373c5fda97eaef53 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 11:05:16 -0700 Subject: [PATCH 09/22] rewrite test to avoid io --- tokio/src/task/local.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 8ad70431b6e..7d4fa20b481 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -1037,22 +1037,20 @@ mod tests { // than in `tests/`, because it makes assertions about the local set's // internal state. #[test] - #[cfg(feature = "net")] - fn io_wakes_to_local_queue() { + fn wakes_to_local_queue() { use super::*; - use crate::net::{TcpListener, TcpStream}; + use crate::sync::Notify; let rt = crate::runtime::Builder::new_current_thread() - .enable_io() .build() .expect("rt"); rt.block_on(async { let local = LocalSet::new(); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener - .local_addr() - .expect("listener should have an address"); - let task = local.spawn_local(async move { - let _ = listener.accept().await; + let notify = Arc::new(Notify::new()); + let task = local.spawn_local({ + let notify = notify.clone(); + async move { + notify.notified().await; + } }); let mut run_until = Box::pin(local.run_until(async move { task.await.unwrap(); @@ -1065,7 +1063,7 @@ mod tests { }) .await; - let _sock = TcpStream::connect(addr).await.unwrap(); + notify.notify_one(); let task = unsafe { local.context.shared.local_queue().pop_front() }; // TODO(eliza): it would be nice to be able to assert that this is // the local task. From 51a3b66a5ab624b111923235a1855bd9c55cce7e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 11:07:35 -0700 Subject: [PATCH 10/22] use loom thread IDs (whoops!) i forgot to import `Thread`/`ThreadId` from `loom` to use the mocked thread IDs during loom runs. this should debreak the loom tests. --- tokio/src/task/local.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 7d4fa20b481..214724af54e 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -1,5 +1,6 @@ //! Runs `!Send` futures on the current thread. use crate::loom::sync::{Arc, Mutex}; +use crate::loom::thread::{self, ThreadId}; use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task}; use crate::sync::AtomicWaker; use crate::util::{RcCell, VecDequeCell}; @@ -12,7 +13,6 @@ use std::marker::PhantomData; use std::pin::Pin; use std::rc::Rc; use std::task::Poll; -use std::thread::{self, ThreadId}; use pin_project_lite::pin_project; From 80a501aa5157356f41fa3fddd3d829f92f89aa8b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 11:21:51 -0700 Subject: [PATCH 11/22] rm unused test code --- tokio/src/runtime/task/join.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index df00fb1e83d..927be1acf3e 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -282,13 +282,6 @@ impl JoinHandle { pub fn id(&self) -> super::Id { self.id.clone() } - - /// if `tokio_unstable` is disabled, we still use task IDs for internal - /// testing purposes, so this is `pub(crate)`. - #[cfg(all(test, not(tokio_unstable)))] - pub(crate) fn id(&self) -> super::Id { - self.id.clone() - } } impl Unpin for JoinHandle {} From 4e97d748d151bcd48ddb41e77084fd849aaf1cfd Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 12:09:40 -0700 Subject: [PATCH 12/22] try to get a panic message out of macos ci --- .github/workflows/ci.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 193f94c9e01..bfead362b40 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -80,6 +80,10 @@ jobs: - uses: Swatinem/rust-cache@v1 - name: Install cargo-hack uses: taiki-e/install-action@cargo-hack + # trying to figure out what the panic i cant repro was lol + - name: test tokio full + run: cargo test --features full --test task_local_set -- --nocapture --test-threads=1 + working-directory: tokio # Run `tokio` with `full` features. This excludes testing utilities which # can alter the runtime behavior of Tokio. From c4e0cc1de2049a62bcd917cb428839be729b782e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 12:15:39 -0700 Subject: [PATCH 13/22] ladies and gentlemen, we got him --- .github/workflows/ci.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bfead362b40..193f94c9e01 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -80,10 +80,6 @@ jobs: - uses: Swatinem/rust-cache@v1 - name: Install cargo-hack uses: taiki-e/install-action@cargo-hack - # trying to figure out what the panic i cant repro was lol - - name: test tokio full - run: cargo test --features full --test task_local_set -- --nocapture --test-threads=1 - working-directory: tokio # Run `tokio` with `full` features. This excludes testing utilities which # can alter the runtime behavior of Tokio. From f2564c18d7bd41ea9816656b63cab304604003d6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 12:24:37 -0700 Subject: [PATCH 14/22] fix possible double panic while dropping --- tokio/src/task/local.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 214724af54e..3119af5f28f 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -914,7 +914,7 @@ impl Shared { // We are on the thread that owns the `LocalSet`, so we can // wake to the local queue. - _ if localdata.get_or_insert_id() == self.owner => { + _ if localdata.get_or_insert_id() == Some(self.owner) => { unsafe { // Safety: we just checked that the thread ID matches // the localset's owner, so this is safe. @@ -993,15 +993,20 @@ impl task::Schedule for Arc { } impl LocalData { - fn get_or_insert_id(&self) -> ThreadId { - match self.thread_id.get() { - Some(id) => id, - None => { - let id = thread::current().id(); - self.thread_id.set(Some(id)); - id + fn get_or_insert_id(&self) -> Option { + self.thread_id.get().or_else(|| { + if std::thread::panicking() { + // Don't try to get the current thread's ID if we are unwinding: + // on some systems (definitely some macOS versions, possibly + // others), attempting to get the thread ID may panic if the + // thread's local data is being destroyed. + return None; } - } + + let id = thread::current().id(); + self.thread_id.set(Some(id)); + Some(id) + }) } } @@ -1009,6 +1014,7 @@ fn thread_id() -> Option { CURRENT .try_with(|localdata| localdata.get_or_insert_id()) .ok() + .flatten() } #[cfg(all(test, not(loom)))] From 1aa7a397de9da4fb6c4189ff37507dffa9c9d928 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 12:26:57 -0700 Subject: [PATCH 15/22] +docs --- tokio/src/task/local.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 3119af5f28f..52e86fd059a 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -1020,6 +1020,12 @@ fn thread_id() -> Option { #[cfg(all(test, not(loom)))] mod tests { use super::*; + + // Does a `LocalSet` running on a current-thread runtime...basically work? + // + // This duplicates a test in `tests/task_local_set.rs`, but because this is + // a lib test, it wil run under Miri, so this is necessary to catch stacked + // borrows violations in the `LocalSet` implementation. #[test] fn local_current_thread_scheduler() { let f = async { From 75c93b8b1c4cda107c53f01aa6df4155211fcd98 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 12:28:35 -0700 Subject: [PATCH 16/22] put back const thread local (that wasn't actually why loom failed) --- tokio/src/task/local.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 52e86fd059a..0b4cebfbf30 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -272,8 +272,14 @@ pin_project! { } } -// #[cfg(not(any(loom, tokio_no_const_thread_local)))] -thread_local!(static CURRENT: LocalData = { LocalData { +#[cfg(any(loom, tokio_no_const_thread_local))] +thread_local!(static CURRENT: RcCell = LocalData { + thread_id: Cell::new(None), + ctx: RcCell::new(), +}); + +#[cfg(not(any(loom, tokio_no_const_thread_local)))] +thread_local!(static CURRENT: LocalData = const { LocalData { thread_id: Cell::new(None), ctx: RcCell::new(), } }); From ba7b63fe048607efdac769707ab0614e489d3734 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 13:08:12 -0700 Subject: [PATCH 17/22] whoops --- tokio/src/task/local.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 0b4cebfbf30..c598c8830f5 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -273,7 +273,7 @@ pin_project! { } #[cfg(any(loom, tokio_no_const_thread_local))] -thread_local!(static CURRENT: RcCell = LocalData { +thread_local!(static CURRENT: LocalData = LocalData { thread_id: Cell::new(None), ctx: RcCell::new(), }); From 6ce9caafb3e9eeb7ecc518743d9eed64d15c7e84 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 13:09:21 -0700 Subject: [PATCH 18/22] we already have a thread-local macro that Does The Right THing --- tokio/src/task/local.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index c598c8830f5..249ec981168 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -272,13 +272,6 @@ pin_project! { } } -#[cfg(any(loom, tokio_no_const_thread_local))] -thread_local!(static CURRENT: LocalData = LocalData { - thread_id: Cell::new(None), - ctx: RcCell::new(), -}); - -#[cfg(not(any(loom, tokio_no_const_thread_local)))] thread_local!(static CURRENT: LocalData = const { LocalData { thread_id: Cell::new(None), ctx: RcCell::new(), From 6350e870a5f1e70ddca669cfb5391e5c21722813 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 13:14:39 -0700 Subject: [PATCH 19/22] just don't make the assertion on drop --- tokio/src/task/local.rs | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 249ec981168..3396c14cb70 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -807,13 +807,20 @@ impl Drop for LocalSet { // We already called shutdown on all tasks above, so there is no // need to call shutdown. - let local_queue = unsafe { - // Safety: because `LocalSet` is `!Send`, if we are dropping a - // `LocalSet`, we know we are on the same thread it was created - // on, so we can access its local queue. - self.context.shared.local_queue().take() - }; + // Safety: note that this *intentionally* bypasses the unsafe + // `Shared::local_queue()` method. This is in order to avoid the + // debug assertion that we are on the thread that owns the + // `LocalSet`, because on some systems (e.g. at least some macOS + // versions), attempting to get the current thread ID can panic due + // to the thread's local data that stores the thread ID being + // dropped *before* the `LocalSet`. + // + // Despite avoiding the assertion here, it is safe for us to access + // the local queue in `Drop`, because the `LocalSet` itself is + // `!Send`, so we can reasonably guarantee that it will not be + // `Drop`ped from another thread. + let local_queue = self.context.shared.local_queue.take(); for task in local_queue { drop(task); } @@ -913,7 +920,7 @@ impl Shared { // We are on the thread that owns the `LocalSet`, so we can // wake to the local queue. - _ if localdata.get_or_insert_id() == Some(self.owner) => { + _ if localdata.get_or_insert_id() == self.owner => { unsafe { // Safety: we just checked that the thread ID matches // the localset's owner, so this is safe. @@ -992,19 +999,11 @@ impl task::Schedule for Arc { } impl LocalData { - fn get_or_insert_id(&self) -> Option { - self.thread_id.get().or_else(|| { - if std::thread::panicking() { - // Don't try to get the current thread's ID if we are unwinding: - // on some systems (definitely some macOS versions, possibly - // others), attempting to get the thread ID may panic if the - // thread's local data is being destroyed. - return None; - } - + fn get_or_insert_id(&self) -> ThreadId { + self.thread_id.get().unwrap_or_else(|| { let id = thread::current().id(); self.thread_id.set(Some(id)); - Some(id) + id }) } } @@ -1013,7 +1012,6 @@ fn thread_id() -> Option { CURRENT .try_with(|localdata| localdata.get_or_insert_id()) .ok() - .flatten() } #[cfg(all(test, not(loom)))] From 61b29c629f0213e54cea87599322d5fdc987d0c5 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 13:41:24 -0700 Subject: [PATCH 20/22] try to disambiguate macro resolution --- tokio/src/coop.rs | 2 +- tokio/src/fs/mocks.rs | 2 +- tokio/src/macros/scoped_tls.rs | 2 +- tokio/src/macros/thread_local.rs | 6 +++--- tokio/src/runtime/enter.rs | 2 +- tokio/src/task/local.rs | 2 +- tokio/src/util/rand.rs | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 3d15a53f25d..7e999bbb2e5 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -31,7 +31,7 @@ use std::cell::Cell; -thread_local! { +tokio_thread_local! { static CURRENT: Cell = const { Cell::new(Budget::unconstrained()) }; } diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index b1861726778..aa01e24711e 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -81,7 +81,7 @@ impl Write for &'_ MockFile { } } -thread_local! { +tokio_thread_local! { static QUEUE: RefCell>> = RefCell::new(VecDeque::new()) } diff --git a/tokio/src/macros/scoped_tls.rs b/tokio/src/macros/scoped_tls.rs index 2ff90c06d79..ed74b32d56d 100644 --- a/tokio/src/macros/scoped_tls.rs +++ b/tokio/src/macros/scoped_tls.rs @@ -10,7 +10,7 @@ macro_rules! scoped_thread_local { $vis static $name: $crate::macros::scoped_tls::ScopedKey<$ty> = $crate::macros::scoped_tls::ScopedKey { inner: { - thread_local!(static FOO: ::std::cell::Cell<*const ()> = const { + tokio_thread_local!(static FOO: ::std::cell::Cell<*const ()> = const { std::cell::Cell::new(::std::ptr::null()) }); &FOO diff --git a/tokio/src/macros/thread_local.rs b/tokio/src/macros/thread_local.rs index 13a4f467f50..eb62fa9c86d 100644 --- a/tokio/src/macros/thread_local.rs +++ b/tokio/src/macros/thread_local.rs @@ -1,5 +1,5 @@ #[cfg(all(loom, test))] -macro_rules! thread_local { +macro_rules! tokio_thread_local { ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty = const { $expr:expr } $(;)?) => { loom::thread_local! { $(#[$attrs])* @@ -12,13 +12,13 @@ macro_rules! thread_local { #[cfg(not(tokio_no_const_thread_local))] #[cfg(not(all(loom, test)))] -macro_rules! thread_local { +macro_rules! tokio_thread_local { ($($tts:tt)+) => { ::std::thread_local!{ $($tts)+ } } } #[cfg(tokio_no_const_thread_local)] #[cfg(not(all(loom, test)))] -macro_rules! thread_local { +macro_rules! tokio_thread_local { ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty = const { $expr:expr } $(;)?) => { ::std::thread_local! { $(#[$attrs])* diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index 66b17868b95..221864eb992 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -17,7 +17,7 @@ impl EnterContext { } } -thread_local!(static ENTERED: Cell = const { Cell::new(EnterContext::NotEntered) }); +tokio_thread_local!(static ENTERED: Cell = const { Cell::new(EnterContext::NotEntered) }); /// Represents an executor context. pub(crate) struct Enter { diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 3396c14cb70..4a2209cd2e8 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -272,7 +272,7 @@ pin_project! { } } -thread_local!(static CURRENT: LocalData = const { LocalData { +tokio_thread_local!(static CURRENT: LocalData = const { LocalData { thread_id: Cell::new(None), ctx: RcCell::new(), } }); diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 09754cea993..899ceeee39c 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -157,7 +157,7 @@ impl FastRand { } } -thread_local! { +tokio_thread_local! { static THREAD_RNG: FastRand = FastRand::new(RngSeed::new()); } From 1f5f645a64a5cb6a800c9a084085c6d5deadc90b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 15:45:39 -0700 Subject: [PATCH 21/22] Revert "try to disambiguate macro resolution" This reverts commit 61b29c629f0213e54cea87599322d5fdc987d0c5. --- tokio/src/coop.rs | 2 +- tokio/src/fs/mocks.rs | 2 +- tokio/src/macros/scoped_tls.rs | 2 +- tokio/src/macros/thread_local.rs | 6 +++--- tokio/src/runtime/enter.rs | 2 +- tokio/src/task/local.rs | 2 +- tokio/src/util/rand.rs | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 7e999bbb2e5..3d15a53f25d 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -31,7 +31,7 @@ use std::cell::Cell; -tokio_thread_local! { +thread_local! { static CURRENT: Cell = const { Cell::new(Budget::unconstrained()) }; } diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index aa01e24711e..b1861726778 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -81,7 +81,7 @@ impl Write for &'_ MockFile { } } -tokio_thread_local! { +thread_local! { static QUEUE: RefCell>> = RefCell::new(VecDeque::new()) } diff --git a/tokio/src/macros/scoped_tls.rs b/tokio/src/macros/scoped_tls.rs index ed74b32d56d..2ff90c06d79 100644 --- a/tokio/src/macros/scoped_tls.rs +++ b/tokio/src/macros/scoped_tls.rs @@ -10,7 +10,7 @@ macro_rules! scoped_thread_local { $vis static $name: $crate::macros::scoped_tls::ScopedKey<$ty> = $crate::macros::scoped_tls::ScopedKey { inner: { - tokio_thread_local!(static FOO: ::std::cell::Cell<*const ()> = const { + thread_local!(static FOO: ::std::cell::Cell<*const ()> = const { std::cell::Cell::new(::std::ptr::null()) }); &FOO diff --git a/tokio/src/macros/thread_local.rs b/tokio/src/macros/thread_local.rs index eb62fa9c86d..13a4f467f50 100644 --- a/tokio/src/macros/thread_local.rs +++ b/tokio/src/macros/thread_local.rs @@ -1,5 +1,5 @@ #[cfg(all(loom, test))] -macro_rules! tokio_thread_local { +macro_rules! thread_local { ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty = const { $expr:expr } $(;)?) => { loom::thread_local! { $(#[$attrs])* @@ -12,13 +12,13 @@ macro_rules! tokio_thread_local { #[cfg(not(tokio_no_const_thread_local))] #[cfg(not(all(loom, test)))] -macro_rules! tokio_thread_local { +macro_rules! thread_local { ($($tts:tt)+) => { ::std::thread_local!{ $($tts)+ } } } #[cfg(tokio_no_const_thread_local)] #[cfg(not(all(loom, test)))] -macro_rules! tokio_thread_local { +macro_rules! thread_local { ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty = const { $expr:expr } $(;)?) => { ::std::thread_local! { $(#[$attrs])* diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index 221864eb992..66b17868b95 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -17,7 +17,7 @@ impl EnterContext { } } -tokio_thread_local!(static ENTERED: Cell = const { Cell::new(EnterContext::NotEntered) }); +thread_local!(static ENTERED: Cell = const { Cell::new(EnterContext::NotEntered) }); /// Represents an executor context. pub(crate) struct Enter { diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 4a2209cd2e8..3396c14cb70 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -272,7 +272,7 @@ pin_project! { } } -tokio_thread_local!(static CURRENT: LocalData = const { LocalData { +thread_local!(static CURRENT: LocalData = const { LocalData { thread_id: Cell::new(None), ctx: RcCell::new(), } }); diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 899ceeee39c..09754cea993 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -157,7 +157,7 @@ impl FastRand { } } -tokio_thread_local! { +thread_local! { static THREAD_RNG: FastRand = FastRand::new(RngSeed::new()); } From 80438495e2e2ef03c60744d6da5003bdacf4605a Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 12 Oct 2022 15:45:58 -0700 Subject: [PATCH 22/22] Revert "we already have a thread-local macro that Does The Right THing" This reverts commit 6ce9caafb3e9eeb7ecc518743d9eed64d15c7e84. --- tokio/src/task/local.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 3396c14cb70..952ae93ea68 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -272,6 +272,13 @@ pin_project! { } } +#[cfg(any(loom, tokio_no_const_thread_local))] +thread_local!(static CURRENT: LocalData = LocalData { + thread_id: Cell::new(None), + ctx: RcCell::new(), +}); + +#[cfg(not(any(loom, tokio_no_const_thread_local)))] thread_local!(static CURRENT: LocalData = const { LocalData { thread_id: Cell::new(None), ctx: RcCell::new(),