Skip to content

Commit

Permalink
rt: remove runtime::context module
Browse files Browse the repository at this point in the history
The current runtime thread-local is moved to `runtime::scheduler` as
well as methods to enter and access the current handle.
  • Loading branch information
carllerche committed Oct 27, 2022
1 parent 5c2e275 commit 06fda60
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 84 deletions.
7 changes: 3 additions & 4 deletions tokio/src/runtime/blocking/pool.rs
Expand Up @@ -5,7 +5,6 @@ use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::{shutdown, BlockingTask};
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
use crate::util::{replace_thread_rng, RngSeedGenerator};
Expand Down Expand Up @@ -135,7 +134,7 @@ where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let rt = context::current();
let rt = Handle::current();
rt.spawn_blocking(func)
}

Expand All @@ -153,7 +152,7 @@ cfg_fs! {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let rt = context::current();
let rt = Handle::current();
rt.inner.blocking_spawner().spawn_mandatory_blocking(&rt, func)
}
}
Expand Down Expand Up @@ -418,7 +417,7 @@ impl Spawner {

builder.spawn(move || {
// Only the reference should be moved into the closure
let _enter = crate::runtime::context::enter(rt.clone());
let _enter = rt.enter();
rt.inner.blocking_spawner().inner.run(id);
drop(shutdown_tx);
})
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/builder.rs
Expand Up @@ -1032,7 +1032,7 @@ cfg_rt_multi_thread! {
let handle = Handle { inner: handle };

// Spawn the thread pool workers
let _enter = crate::runtime::context::enter(handle.clone());
let _enter = handle.enter();
launch.launch();

Ok(Runtime {
Expand Down
66 changes: 0 additions & 66 deletions tokio/src/runtime/context.rs

This file was deleted.

7 changes: 3 additions & 4 deletions tokio/src/runtime/handle.rs
Expand Up @@ -13,7 +13,6 @@ pub struct Handle {
pub(crate) inner: scheduler::Handle,
}

use crate::runtime::context;
use crate::runtime::task::JoinHandle;
use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};

Expand All @@ -30,7 +29,7 @@ use std::{error, fmt};
#[derive(Debug)]
#[must_use = "Creating and dropping a guard does nothing"]
pub struct EnterGuard<'a> {
_guard: context::EnterGuard,
_guard: scheduler::EnterGuard,
_handle_lifetime: PhantomData<&'a Handle>,
}

Expand All @@ -45,7 +44,7 @@ impl Handle {
/// [`tokio::spawn`]: fn@crate::spawn
pub fn enter(&self) -> EnterGuard<'_> {
EnterGuard {
_guard: context::enter(self.clone()),
_guard: self.inner.enter(),
_handle_lifetime: PhantomData,
}
}
Expand Down Expand Up @@ -107,7 +106,7 @@ impl Handle {
///
/// Contrary to `current`, this never panics
pub fn try_current() -> Result<Self, TryCurrentError> {
context::try_current()
scheduler::Handle::try_current().map(|inner| Handle { inner })
}

/// Spawns a future onto the Tokio runtime.
Expand Down
4 changes: 1 addition & 3 deletions tokio/src/runtime/mod.rs
Expand Up @@ -226,8 +226,6 @@ cfg_rt! {
pub use crate::util::RngSeed;
}

pub(crate) mod context;

use self::enter::enter;

mod handle;
Expand Down Expand Up @@ -622,7 +620,7 @@ cfg_rt! {
Scheduler::CurrentThread(current_thread) => {
// This ensures that tasks spawned on the current-thread
// runtime are dropped inside the runtime's context.
match self::context::try_enter(self.handle.clone()) {
match self.handle.inner.try_enter() {
Some(guard) => current_thread.set_context_guard(guard),
None => {
// The context thread-local has already been destroyed.
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/current_thread.rs
@@ -1,8 +1,8 @@
use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::{self, Driver};
use crate::runtime::scheduler::EnterGuard;
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{blocking, Config};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
Expand Down
64 changes: 61 additions & 3 deletions tokio/src/runtime/scheduler/mod.rs
Expand Up @@ -64,14 +64,62 @@ impl Handle {
cfg_rt! {
use crate::future::Future;
use crate::loom::sync::Arc;
use crate::runtime::{blocking, task::Id};
use crate::runtime::{blocking, task::Id, TryCurrentError};
use crate::task::JoinHandle;
use crate::util::RngSeedGenerator;
use crate::util::{replace_thread_rng, RngSeed, RngSeedGenerator};

use std::cell::RefCell;

#[derive(Debug)]
pub(crate) struct EnterGuard {
old_handle: Option<Handle>,
old_seed: RngSeed,
}

tokio_thread_local! {
static CURRENT: RefCell<Option<Handle>> = const { RefCell::new(None) }
}

impl Handle {
#[track_caller]
pub(crate) fn current() -> Handle {
crate::runtime::context::current().inner
match Handle::try_current() {
Ok(handle) => handle,
Err(e) => panic!("{}", e),
}
}

pub(crate) fn try_current() -> Result<Handle, TryCurrentError> {
match CURRENT.try_with(|ctx| ctx.borrow().clone()) {
Ok(Some(handle)) => Ok(handle),
Ok(None) => Err(TryCurrentError::new_no_context()),
Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()),
}
}

/// Sets this [`Handle`] as the current active [`Handle`].
///
/// [`Handle`]: Handle
pub(crate) fn enter(&self) -> EnterGuard {
match self.try_enter() {
Some(guard) => guard,
None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
}
}

/// Sets this [`Handle`] as the current active [`Handle`].
///
/// [`Handle`]: Handle
pub(crate) fn try_enter(&self) -> Option<EnterGuard> {
let rng_seed = self.seed_generator().next_seed();
let old_handle = CURRENT.try_with(|ctx| ctx.borrow_mut().replace(self.clone())).ok()?;

let old_seed = replace_thread_rng(rng_seed);

Some(EnterGuard {
old_handle,
old_seed,
})
}

pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
Expand Down Expand Up @@ -115,6 +163,16 @@ cfg_rt! {
}
}

impl Drop for EnterGuard {
fn drop(&mut self) {
CURRENT.with(|ctx| {
*ctx.borrow_mut() = self.old_handle.take();
});
// We discard the RngSeed associated with this guard
let _ = replace_thread_rng(self.old_seed.clone());
}
}

cfg_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};

Expand Down
5 changes: 3 additions & 2 deletions tokio/src/task/builder.rs
@@ -1,6 +1,6 @@
#![allow(unreachable_pub)]
use crate::{
runtime::{context, Handle},
runtime::Handle,
task::{JoinHandle, LocalSet},
};
use std::{future::Future, io};
Expand Down Expand Up @@ -167,7 +167,8 @@ impl<'a> Builder<'a> {
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
{
self.spawn_blocking_on(function, &context::current())
let handle = Handle::current();
self.spawn_blocking_on(function, &handle)
}

/// Spawns blocking code on the provided [runtime handle]'s blocking threadpool.
Expand Down

0 comments on commit 06fda60

Please sign in to comment.