diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 79bc15c6ecd..239df4902a4 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -5,7 +5,6 @@ use crate::loom::thread; use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::{shutdown, BlockingTask}; use crate::runtime::builder::ThreadNameFn; -use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; use crate::util::{replace_thread_rng, RngSeedGenerator}; @@ -135,7 +134,7 @@ where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let rt = context::current(); + let rt = Handle::current(); rt.spawn_blocking(func) } @@ -153,7 +152,7 @@ cfg_fs! { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let rt = context::current(); + let rt = Handle::current(); rt.inner.blocking_spawner().spawn_mandatory_blocking(&rt, func) } } @@ -418,7 +417,7 @@ impl Spawner { builder.spawn(move || { // Only the reference should be moved into the closure - let _enter = crate::runtime::context::enter(rt.clone()); + let _enter = rt.enter(); rt.inner.blocking_spawner().inner.run(id); drop(shutdown_tx); }) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 562733b1226..e1ad631bf22 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1032,7 +1032,7 @@ cfg_rt_multi_thread! { let handle = Handle { inner: handle }; // Spawn the thread pool workers - let _enter = crate::runtime::context::enter(handle.clone()); + let _enter = handle.enter(); launch.launch(); Ok(Runtime { diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs deleted file mode 100644 index afa8c3f2e4f..00000000000 --- a/tokio/src/runtime/context.rs +++ /dev/null @@ -1,66 +0,0 @@ -//! Thread local runtime context -use crate::runtime::{Handle, TryCurrentError}; -use crate::util::{replace_thread_rng, RngSeed}; - -use std::cell::RefCell; - -tokio_thread_local! { - static CONTEXT: RefCell> = const { RefCell::new(None) } -} - -pub(crate) fn try_current() -> Result { - match CONTEXT.try_with(|ctx| ctx.borrow().clone()) { - Ok(Some(handle)) => Ok(handle), - Ok(None) => Err(TryCurrentError::new_no_context()), - Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()), - } -} - -#[track_caller] -pub(crate) fn current() -> Handle { - match try_current() { - Ok(handle) => handle, - Err(e) => panic!("{}", e), - } -} - -/// Sets this [`Handle`] as the current active [`Handle`]. -/// -/// [`Handle`]: Handle -pub(crate) fn enter(new: Handle) -> EnterGuard { - match try_enter(new) { - Some(guard) => guard, - None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), - } -} - -/// Sets this [`Handle`] as the current active [`Handle`]. -/// -/// [`Handle`]: Handle -pub(crate) fn try_enter(new: Handle) -> Option { - let rng_seed = new.inner.seed_generator().next_seed(); - let old_handle = CONTEXT.try_with(|ctx| ctx.borrow_mut().replace(new)).ok()?; - - let old_seed = replace_thread_rng(rng_seed); - - Some(EnterGuard { - old_handle, - old_seed, - }) -} - -#[derive(Debug)] -pub(crate) struct EnterGuard { - old_handle: Option, - old_seed: RngSeed, -} - -impl Drop for EnterGuard { - fn drop(&mut self) { - CONTEXT.with(|ctx| { - *ctx.borrow_mut() = self.old_handle.take(); - }); - // We discard the RngSeed associated with this guard - let _ = replace_thread_rng(self.old_seed.clone()); - } -} diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 91974252610..2847cd6d812 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -13,7 +13,6 @@ pub struct Handle { pub(crate) inner: scheduler::Handle, } -use crate::runtime::context; use crate::runtime::task::JoinHandle; use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR}; @@ -30,7 +29,7 @@ use std::{error, fmt}; #[derive(Debug)] #[must_use = "Creating and dropping a guard does nothing"] pub struct EnterGuard<'a> { - _guard: context::EnterGuard, + _guard: scheduler::EnterGuard, _handle_lifetime: PhantomData<&'a Handle>, } @@ -45,7 +44,7 @@ impl Handle { /// [`tokio::spawn`]: fn@crate::spawn pub fn enter(&self) -> EnterGuard<'_> { EnterGuard { - _guard: context::enter(self.clone()), + _guard: self.inner.enter(), _handle_lifetime: PhantomData, } } @@ -107,7 +106,7 @@ impl Handle { /// /// Contrary to `current`, this never panics pub fn try_current() -> Result { - context::try_current() + scheduler::Handle::try_current().map(|inner| Handle { inner }) } /// Spawns a future onto the Tokio runtime. diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index b69d696cdef..db568a62154 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -226,8 +226,6 @@ cfg_rt! { pub use crate::util::RngSeed; } - pub(crate) mod context; - use self::enter::enter; mod handle; @@ -622,7 +620,7 @@ cfg_rt! { Scheduler::CurrentThread(current_thread) => { // This ensures that tasks spawned on the current-thread // runtime are dropped inside the runtime's context. - match self::context::try_enter(self.handle.clone()) { + match self.handle.inner.try_enter() { Some(guard) => current_thread.set_context_guard(guard), None => { // The context thread-local has already been destroyed. diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index d11d93253ad..b9ace1ea5be 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -1,8 +1,8 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::{Arc, Mutex}; -use crate::runtime::context::EnterGuard; use crate::runtime::driver::{self, Driver}; +use crate::runtime::scheduler::EnterGuard; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; use crate::runtime::{blocking, Config}; use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 4805da4720c..12dbdd2f182 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -64,14 +64,62 @@ impl Handle { cfg_rt! { use crate::future::Future; use crate::loom::sync::Arc; - use crate::runtime::{blocking, task::Id}; + use crate::runtime::{blocking, task::Id, TryCurrentError}; use crate::task::JoinHandle; - use crate::util::RngSeedGenerator; + use crate::util::{replace_thread_rng, RngSeed, RngSeedGenerator}; + + use std::cell::RefCell; + + #[derive(Debug)] + pub(crate) struct EnterGuard { + old_handle: Option, + old_seed: RngSeed, + } + + tokio_thread_local! { + static CURRENT: RefCell> = const { RefCell::new(None) } + } impl Handle { #[track_caller] pub(crate) fn current() -> Handle { - crate::runtime::context::current().inner + match Handle::try_current() { + Ok(handle) => handle, + Err(e) => panic!("{}", e), + } + } + + pub(crate) fn try_current() -> Result { + match CURRENT.try_with(|ctx| ctx.borrow().clone()) { + Ok(Some(handle)) => Ok(handle), + Ok(None) => Err(TryCurrentError::new_no_context()), + Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()), + } + } + + /// Sets this [`Handle`] as the current active [`Handle`]. + /// + /// [`Handle`]: Handle + pub(crate) fn enter(&self) -> EnterGuard { + match self.try_enter() { + Some(guard) => guard, + None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), + } + } + + /// Sets this [`Handle`] as the current active [`Handle`]. + /// + /// [`Handle`]: Handle + pub(crate) fn try_enter(&self) -> Option { + let rng_seed = self.seed_generator().next_seed(); + let old_handle = CURRENT.try_with(|ctx| ctx.borrow_mut().replace(self.clone())).ok()?; + + let old_seed = replace_thread_rng(rng_seed); + + Some(EnterGuard { + old_handle, + old_seed, + }) } pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner { @@ -115,6 +163,16 @@ cfg_rt! { } } + impl Drop for EnterGuard { + fn drop(&mut self) { + CURRENT.with(|ctx| { + *ctx.borrow_mut() = self.old_handle.take(); + }); + // We discard the RngSeed associated with this guard + let _ = replace_thread_rng(self.old_seed.clone()); + } + } + cfg_metrics! { use crate::runtime::{SchedulerMetrics, WorkerMetrics}; diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 007d8a4474a..91c400c5096 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -1,6 +1,6 @@ #![allow(unreachable_pub)] use crate::{ - runtime::{context, Handle}, + runtime::Handle, task::{JoinHandle, LocalSet}, }; use std::{future::Future, io}; @@ -167,7 +167,8 @@ impl<'a> Builder<'a> { Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, { - self.spawn_blocking_on(function, &context::current()) + let handle = Handle::current(); + self.spawn_blocking_on(function, &handle) } /// Spawns blocking code on the provided [runtime handle]'s blocking threadpool.