diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 21c3de3cf2e..d60c80ceac0 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -497,8 +497,8 @@ cfg_rt! { pub mod runtime; } cfg_not_rt! { - // The `runtime` module is used when the IO or time driver is needed. #[cfg(any( + feature = "macros", feature = "net", feature = "time", all(unix, feature = "process"), diff --git a/tokio/src/macros/support.rs b/tokio/src/macros/support.rs index 7f11bc68001..10526bcbca7 100644 --- a/tokio/src/macros/support.rs +++ b/tokio/src/macros/support.rs @@ -1,7 +1,11 @@ cfg_macros! { pub use crate::future::poll_fn; pub use crate::future::maybe_done::maybe_done; - pub use crate::util::thread_rng_n; + + #[doc(hidden)] + pub fn thread_rng_n(n: u32) -> u32 { + crate::runtime::context::thread_rng_n(n) + } } pub use std::future::Future; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 239df4902a4..dd166109469 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -7,7 +7,6 @@ use crate::runtime::blocking::{shutdown, BlockingTask}; use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; -use crate::util::{replace_thread_rng, RngSeedGenerator}; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -48,9 +47,6 @@ struct Inner { // Customizable wait timeout. keep_alive: Duration, - - // Random number seed - seed_generator: RngSeedGenerator, } struct Shared { @@ -185,7 +181,6 @@ impl BlockingPool { before_stop: builder.before_stop.clone(), thread_cap, keep_alive, - seed_generator: builder.seed_generator.next_generator(), }), }, shutdown_rx, @@ -435,8 +430,6 @@ impl Inner { if let Some(f) = &self.after_start { f() } - // We own this thread so there is no need to replace the RngSeed once we're done. - let _ = replace_thread_rng(self.seed_generator.next_seed()); let mut shared = self.shared.lock(); let mut join_on_thread = None; diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index de29a845fc6..0f472ce0fa0 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,6 +1,6 @@ use crate::runtime::handle::Handle; use crate::runtime::{blocking, driver, Callback, Runtime}; -use crate::util::{RngSeed, RngSeedGenerator}; +use crate::util::rand::{RngSeed, RngSeedGenerator}; use std::fmt; use std::io; diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs new file mode 100644 index 00000000000..b8c19f4bb9e --- /dev/null +++ b/tokio/src/runtime/context.rs @@ -0,0 +1,72 @@ +use crate::util::rand::{FastRand, RngSeed}; + +cfg_rt! { + use crate::runtime::scheduler; + use std::cell::RefCell; +} + +struct Context { + /// Handle to the runtime scheduler running on the current thread. + #[cfg(feature = "rt")] + scheduler: RefCell>, + rng: FastRand, +} + +tokio_thread_local! { + static CONTEXT: Context = { + Context { + #[cfg(feature = "rt")] + scheduler: RefCell::new(None), + rng: FastRand::new(RngSeed::new()), + } + } +} + +#[cfg(feature = "macros")] +pub(crate) fn thread_rng_n(n: u32) -> u32 { + CONTEXT.with(|ctx| ctx.rng.fastrand_n(n)) +} + +cfg_rt! { + use crate::runtime::TryCurrentError; + + #[derive(Debug)] + pub(crate) struct EnterGuard { + old_handle: Option, + old_seed: RngSeed, + } + + pub(crate) fn try_current() -> Result { + match CONTEXT.try_with(|ctx| ctx.scheduler.borrow().clone()) { + Ok(Some(handle)) => Ok(handle), + Ok(None) => Err(TryCurrentError::new_no_context()), + Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()), + } + } + + /// Sets this [`Handle`] as the current active [`Handle`]. + /// + /// [`Handle`]: crate::runtime::scheduler::Handle + pub(crate) fn try_enter(handle: &scheduler::Handle) -> Option { + let rng_seed = handle.seed_generator().next_seed(); + + CONTEXT.try_with(|ctx| { + let old_handle = ctx.scheduler.borrow_mut().replace(handle.clone()); + let old_seed = ctx.rng.replace_seed(rng_seed); + + EnterGuard { + old_handle, + old_seed, + } + }).ok() + } + + impl Drop for EnterGuard { + fn drop(&mut self) { + CONTEXT.with(|ctx| { + *ctx.scheduler.borrow_mut() = self.old_handle.take(); + ctx.rng.replace_seed(self.old_seed.clone()); + }); + } + } +} diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 46cdfdc90be..99f07c4e683 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,4 +1,4 @@ -use crate::runtime::{scheduler, RuntimeFlavor}; +use crate::runtime::{context, scheduler, RuntimeFlavor}; /// Handle to the runtime. /// @@ -29,7 +29,7 @@ use std::{error, fmt}; #[derive(Debug)] #[must_use = "Creating and dropping a guard does nothing"] pub struct EnterGuard<'a> { - _guard: scheduler::EnterGuard, + _guard: context::EnterGuard, _handle_lifetime: PhantomData<&'a Handle>, } @@ -106,7 +106,7 @@ impl Handle { /// /// Contrary to `current`, this never panics pub fn try_current() -> Result { - scheduler::Handle::try_current().map(|inner| Handle { inner }) + context::try_current().map(|inner| Handle { inner }) } /// Spawns a future onto the Tokio runtime. diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index a2ba2d9cd33..0c59cf30b7d 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -177,7 +177,11 @@ #[macro_use] mod tests; +#[cfg(any(feature = "rt", feature = "macros"))] +pub(crate) mod context; + mod driver; + pub(crate) mod scheduler; cfg_io_driver_impl! { @@ -223,7 +227,7 @@ cfg_rt! { pub use self::builder::Builder; cfg_unstable! { pub use self::builder::UnhandledPanic; - pub use crate::util::RngSeed; + pub use crate::util::rand::RngSeed; } use self::enter::enter; @@ -632,7 +636,7 @@ cfg_rt! { Scheduler::CurrentThread(current_thread) => { // This ensures that tasks spawned on the current-thread // runtime are dropped inside the runtime's context. - match self.handle.inner.try_enter() { + match context::try_enter(&self.handle.inner) { Some(guard) => current_thread.set_context_guard(guard), None => { // The context thread-local has already been destroyed. diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index b9ace1ea5be..d11d93253ad 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -1,8 +1,8 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::{Arc, Mutex}; +use crate::runtime::context::EnterGuard; use crate::runtime::driver::{self, Driver}; -use crate::runtime::scheduler::EnterGuard; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; use crate::runtime::{blocking, Config}; use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 7ae64c34cb7..6a685a6594a 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -44,64 +44,30 @@ impl Handle { cfg_rt! { use crate::future::Future; use crate::loom::sync::Arc; - use crate::runtime::{blocking, task::Id, TryCurrentError}; + use crate::runtime::{blocking, task::Id}; + use crate::runtime::context::{self, EnterGuard}; use crate::task::JoinHandle; - use crate::util::{replace_thread_rng, RngSeed, RngSeedGenerator}; - - use std::cell::RefCell; - - #[derive(Debug)] - pub(crate) struct EnterGuard { - old_handle: Option, - old_seed: RngSeed, - } - - tokio_thread_local! { - static CURRENT: RefCell> = const { RefCell::new(None) } - } + use crate::util::RngSeedGenerator; impl Handle { #[track_caller] pub(crate) fn current() -> Handle { - match Handle::try_current() { + match context::try_current() { Ok(handle) => handle, Err(e) => panic!("{}", e), } } - pub(crate) fn try_current() -> Result { - match CURRENT.try_with(|ctx| ctx.borrow().clone()) { - Ok(Some(handle)) => Ok(handle), - Ok(None) => Err(TryCurrentError::new_no_context()), - Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()), - } - } - /// Sets this [`Handle`] as the current active [`Handle`]. /// /// [`Handle`]: Handle pub(crate) fn enter(&self) -> EnterGuard { - match self.try_enter() { + match context::try_enter(self) { Some(guard) => guard, None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), } } - /// Sets this [`Handle`] as the current active [`Handle`]. - /// - /// [`Handle`]: Handle - pub(crate) fn try_enter(&self) -> Option { - let rng_seed = self.seed_generator().next_seed(); - let old_handle = CURRENT.try_with(|ctx| ctx.borrow_mut().replace(self.clone())).ok()?; - - let old_seed = replace_thread_rng(rng_seed); - - Some(EnterGuard { - old_handle, - old_seed, - }) - } - pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner { match self { Handle::CurrentThread(h) => &h.blocking_spawner, @@ -143,16 +109,6 @@ cfg_rt! { } } - impl Drop for EnterGuard { - fn drop(&mut self) { - CURRENT.with(|ctx| { - *ctx.borrow_mut() = self.old_handle.take(); - }); - // We discard the RngSeed associated with this guard - let _ = replace_thread_rng(self.old_seed.clone()); - } - } - cfg_metrics! { use crate::runtime::{SchedulerMetrics, WorkerMetrics}; @@ -201,6 +157,12 @@ cfg_rt! { } cfg_not_rt! { + #[cfg(any( + feature = "net", + all(unix, feature = "process"), + all(unix, feature = "signal"), + feature = "time", + ))] impl Handle { #[track_caller] pub(crate) fn current() -> Handle { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 7a7e676dcc9..3407186b30e 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -66,7 +66,7 @@ use crate::runtime::{ blocking, driver, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics, }; use crate::util::atomic_cell::AtomicCell; -use crate::util::{FastRand, RngSeedGenerator}; +use crate::util::rand::{FastRand, RngSeedGenerator}; use std::cell::RefCell; use std::time::Duration; diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index e7b645c8183..3948ed84a0c 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -44,13 +44,13 @@ pub(crate) use wake_list::WakeList; pub(crate) mod linked_list; #[cfg(any(feature = "rt", feature = "macros"))] -mod rand; +pub(crate) mod rand; cfg_rt! { mod idle_notified_set; pub(crate) use idle_notified_set::IdleNotifiedSet; - pub(crate) use self::rand::{RngSeedGenerator,replace_thread_rng}; + pub(crate) use self::rand::RngSeedGenerator; mod wake; pub(crate) use wake::WakerRef; @@ -66,17 +66,7 @@ cfg_rt! { pub(crate) use rc_cell::RcCell; } -#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] -#[cfg(feature = "rt")] -pub use self::rand::RngSeed; - -#[cfg(any(feature = "macros"))] -#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] -pub use self::rand::thread_rng_n; - cfg_rt_multi_thread! { - pub(crate) use self::rand::FastRand; - mod try_lock; pub(crate) use try_lock::TryLock; } diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 899ceeee39c..fe6b7246c05 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -156,25 +156,3 @@ impl FastRand { s0.wrapping_add(s1) } } - -tokio_thread_local! { - static THREAD_RNG: FastRand = FastRand::new(RngSeed::new()); -} - -/// Seeds the thread local random number generator with the provided seed and -/// return the previously stored seed. -/// -/// The returned seed can be later used to return the thread local random number -/// generator to its previous state. -#[cfg(feature = "rt")] -pub(crate) fn replace_thread_rng(rng_seed: RngSeed) -> RngSeed { - THREAD_RNG.with(|rng| rng.replace_seed(rng_seed)) -} - -// Used by the select macro and `StreamMap` -#[cfg(any(feature = "macros"))] -#[doc(hidden)] -#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] -pub fn thread_rng_n(n: u32) -> u32 { - THREAD_RNG.with(|rng| rng.fastrand_n(n)) -}