From 5b98b3a9e5fc4456d62f2f543ef1fc45d7e942e4 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 12 May 2022 10:43:02 -0700 Subject: [PATCH 1/4] task: improve `Builder::spawn_local` docs Signed-off-by: Eliza Weisman --- tokio/src/task/builder.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 976ecc3c4b0..a8caca10493 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -84,10 +84,20 @@ impl<'a> Builder<'a> { super::spawn::spawn_inner(future, self.name) } - /// Spawns a task on the current thread. + /// Spawns `!Send` a task on the current [`LocalSet`] with this builder's + /// settings. /// - /// See [`task::spawn_local`](crate::task::spawn_local) - /// for more details. + /// The spawned future will be run on the same thread that called `spawn_local.` + /// This may only be called from the context of a [local task set][`LocalSet`]. + /// + /// # Panics + /// + /// - This function panics if called outside of a [local task set][`LocalSet`]. + /// + /// See [`task::spawn_local`] for more details. + /// + /// [`task::spawn_local`]: crate::task::spawn_local + /// [`LocalSet`]: crate::task::LocalSet #[track_caller] pub fn spawn_local(self, future: Fut) -> JoinHandle where From dc8c5b2cd4afd6d6daa1d4c5e1a990f00a57d297 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 12 May 2022 11:09:27 -0700 Subject: [PATCH 2/4] task: add `Builder::{spawn_on, spawn_local_on, spawn_blocking_on}` `task::JoinSet` currently has both `spawn`/`spawn_local` methods, and `spawn_on`/`spawn_local_on` variants of these methods that take a reference to a runtime `Handle` or to a `LocalSet`, and spawn tasks on the provided runtime/`LocalSet`, rather than the current one. The `task::Builder` type is _also_ an API type that can spawn tasks, but it doesn't have `spawn_on` variants of its methods. It occurred to me that it would be nice to have similar APIs on `task::Builder`. This branch adds `task::Builder::spawn_on`, `task::Builder::spawn_local_on`, and `task::Builder::spawn_blocking_on` methods, similar to those on `JoinSet`. In addition, I did some refactoring of the internal spawning APIs --- there was a bit of duplicated code that this PR reduces. --- tokio/src/runtime/context.rs | 9 ----- tokio/src/runtime/handle.rs | 17 +++++++-- tokio/src/task/builder.rs | 71 +++++++++++++++++++++++++++++++++--- tokio/src/task/local.rs | 62 +++++++++++++++++++------------ tokio/src/task/spawn.rs | 20 ++-------- 5 files changed, 122 insertions(+), 57 deletions(-) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index aebbe18755a..5a6434a2b35 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -68,15 +68,6 @@ cfg_time! { } } -cfg_rt! { - pub(crate) fn spawn_handle() -> Option { - match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone())) { - Ok(spawner) => spawner, - Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), - } - } -} - /// Sets this [`Handle`] as the current active [`Handle`]. /// /// [`Handle`]: Handle diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 9d4a35e5e48..1e0e349c0f9 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -175,10 +175,7 @@ impl Handle { F: Future + Send + 'static, F::Output: Send + 'static, { - let id = crate::runtime::task::Id::next(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = crate::util::trace::task(future, "task", None, id.as_u64()); - self.spawner.spawn(future, id) + self.spawn_named(future, None) } /// Runs the provided function on an executor dedicated to blocking. @@ -301,6 +298,18 @@ impl Handle { .expect("failed to park thread") } + #[track_caller] + pub(crate) fn spawn_named(&self, future: F, _name: Option<&str>) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let id = crate::runtime::task::Id::next(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let future = crate::util::trace::task(future, "task", name, id.as_u64()); + self.spawner.spawn(future, id) + } + pub(crate) fn shutdown(mut self) { self.spawner.shutdown(); } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index a8caca10493..926da81517d 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -1,5 +1,8 @@ #![allow(unreachable_pub)] -use crate::{runtime::context, task::JoinHandle}; +use crate::{ + runtime::{context, Handle}, + task::JoinHandle, +}; use std::future::Future; /// Factory which is used to configure the properties of a new task. @@ -71,7 +74,11 @@ impl<'a> Builder<'a> { Self { name: Some(name) } } - /// Spawns a task on the executor. + /// Spawns a task with this builder's settings on the current runtime. + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. /// /// See [`task::spawn`](crate::task::spawn) for /// more details. @@ -81,7 +88,23 @@ impl<'a> Builder<'a> { Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - super::spawn::spawn_inner(future, self.name) + self.spawn_on(task, context::current()) + } + + /// Spawn a task with this builder's settings on the provided [runtime + /// handle]. + /// + /// See [`Handle::spawn`] for more details. + /// + /// [runtime handle]: crate::runtime::Handle + /// [`Handle::spawn`]: crate::runtime::Handle::spawn + #[track_caller] + pub fn spawn_on(&mut self, task: F, handle: &Handle) -> JoinHandle + where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + { + handle.spawn_named(future, self.name) } /// Spawns `!Send` a task on the current [`LocalSet`] with this builder's @@ -92,7 +115,7 @@ impl<'a> Builder<'a> { /// /// # Panics /// - /// - This function panics if called outside of a [local task set][`LocalSet`]. + /// This function panics if called outside of a [local task set][`LocalSet`]. /// /// See [`task::spawn_local`] for more details. /// @@ -107,18 +130,56 @@ impl<'a> Builder<'a> { super::local::spawn_local_inner(future, self.name) } + /// Spawns `!Send` a task on the provided [`LocalSet`] with this builder's + /// settings. + /// + /// See [`LocalSet::spawn_local`] for more details. + /// + /// [`LocalSet::spawn_local`]: crate::task::LocalSet::spawn_local + /// [`LocalSet`]: crate::task::LocalSet + #[track_caller] + pub fn spawn_local_on(self, future: Fut, local_set: &LocalSet) -> JoinHandle + where + Fut: Future + 'static, + Fut::Output: 'static, + { + local_set.spawn_inner(future, self.name) + } + /// Spawns blocking code on the blocking threadpool. /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + /// /// See [`task::spawn_blocking`](crate::task::spawn_blocking) /// for more details. #[track_caller] pub fn spawn_blocking(self, function: Function) -> JoinHandle + where + Function: FnOnce() -> Output + Send + 'static, + Output: Send + 'static, + { + self.spawn_blocking_on(function, context::current()) + } + + /// Spawns blocking code on the provided [runtime handle]'s blocking threadpool. + /// + /// See [`Handle::spawn_blocking`] for more details. + /// + /// [runtime handle]: crate::runtime::Handle + /// [`Handle::spawn_blocking`]: crate::runtime::Handle::spawn_blocking + #[track_caller] + pub fn spawn_blocking_on( + self, + function: Function, + handle: &Handle, + ) -> JoinHandle where Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, { use crate::runtime::Mandatory; - let handle = context::current(); let (join_handle, _was_spawned) = handle.as_inner().spawn_blocking_inner( function, Mandatory::NonMandatory, diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 32e376872f4..c3d874b6138 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -301,19 +301,11 @@ cfg_rt! { where F: Future + 'static, F::Output: 'static { - let id = crate::runtime::task::Id::next(); - let future = crate::util::trace::task(future, "local", name, id.as_u64()); CURRENT.with(|maybe_cx| { let cx = maybe_cx .expect("`spawn_local` called from outside of a `task::LocalSet`"); - let (handle, notified) = cx.owned.bind(future, cx.shared.clone(), id); - - if let Some(notified) = notified { - cx.shared.schedule(notified); - } - - handle + cx.spawn(future, name) }) } } @@ -386,20 +378,7 @@ impl LocalSet { F: Future + 'static, F::Output: 'static, { - let id = crate::runtime::task::Id::next(); - let future = crate::util::trace::task(future, "local", None, id.as_u64()); - - let (handle, notified) = self - .context - .owned - .bind(future, self.context.shared.clone(), id); - - if let Some(notified) = notified { - self.context.shared.schedule(notified); - } - - self.context.shared.waker.wake(); - handle + self.spawn_named(future, None) } /// Runs a future to completion on the provided runtime, driving any local @@ -512,6 +491,21 @@ impl LocalSet { run_until.await } + pub(in crate::task) fn spawn_named( + &self, + future: F, + name: Option<&str>, + ) -> JoinHandle + where + F: Future + 'static, + F::Output: 'static, + { + let handle = self.context.spawn(future, name); + + self.context.shared.waker.wake(); + handle + } + /// Ticks the scheduler, returning whether the local future needs to be /// notified again. fn tick(&self) -> bool { @@ -628,6 +622,28 @@ impl Drop for LocalSet { } } +// === impl Context === + +impl Context { + #[track_caller] + fn spawn(&self, future: F, name: Option<&str>) -> JoinHandle + where + F: Future + 'static, + F::Output: 'static, + { + let id = crate::runtime::task::Id::next(); + let future = crate::util::trace::task(future, "local", name, id.as_u64()); + + let (handle, notified) = self.owned.bind(future, self.shared.clone(), id); + + if let Some(notified) = notified { + self.shared.schedule(notified); + } + + handle + } +} + // === impl LocalFuture === impl Future for RunUntil<'_, T> { diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 5a60f9d66e6..67158576526 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -1,4 +1,4 @@ -use crate::{task::JoinHandle, util::error::CONTEXT_MISSING_ERROR}; +use crate::task::JoinHandle; use std::future::Future; @@ -127,25 +127,13 @@ cfg_rt! { T: Future + Send + 'static, T::Output: Send + 'static, { + use crate::runtime::context; // preventing stack overflows on debug mode, by quickly sending the // task to the heap. if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { - spawn_inner(Box::pin(future), None) + context::current().spawn_named(Box::pin(future), None) } else { - spawn_inner(future, None) + context::current().spawn_named(future, None) } } - - #[track_caller] - pub(super) fn spawn_inner(future: T, name: Option<&str>) -> JoinHandle - where - T: Future + Send + 'static, - T::Output: Send + 'static, - { - use crate::runtime::{task, context}; - let id = task::Id::next(); - let spawn_handle = context::spawn_handle().expect(CONTEXT_MISSING_ERROR); - let task = crate::util::trace::task(future, "task", name, id.as_u64()); - spawn_handle.spawn(task, id) - } } From 8607183440595a4fadcddf35576d2cf98a5e1dc5 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 12 May 2022 11:29:23 -0700 Subject: [PATCH 3/4] actually the prev approach was better it only incremented the spawner's refcount Signed-off-by: Eliza Weisman --- tokio/src/runtime/context.rs | 9 +++++++++ tokio/src/task/builder.rs | 2 +- tokio/src/task/spawn.rs | 20 ++++++++++++++++---- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 5a6434a2b35..aebbe18755a 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -68,6 +68,15 @@ cfg_time! { } } +cfg_rt! { + pub(crate) fn spawn_handle() -> Option { + match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone())) { + Ok(spawner) => spawner, + Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), + } + } +} + /// Sets this [`Handle`] as the current active [`Handle`]. /// /// [`Handle`]: Handle diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 926da81517d..03751626099 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -88,7 +88,7 @@ impl<'a> Builder<'a> { Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - self.spawn_on(task, context::current()) + super::spawn_inner(future, self.name) } /// Spawn a task with this builder's settings on the provided [runtime diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 67158576526..5a60f9d66e6 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -1,4 +1,4 @@ -use crate::task::JoinHandle; +use crate::{task::JoinHandle, util::error::CONTEXT_MISSING_ERROR}; use std::future::Future; @@ -127,13 +127,25 @@ cfg_rt! { T: Future + Send + 'static, T::Output: Send + 'static, { - use crate::runtime::context; // preventing stack overflows on debug mode, by quickly sending the // task to the heap. if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { - context::current().spawn_named(Box::pin(future), None) + spawn_inner(Box::pin(future), None) } else { - context::current().spawn_named(future, None) + spawn_inner(future, None) } } + + #[track_caller] + pub(super) fn spawn_inner(future: T, name: Option<&str>) -> JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + use crate::runtime::{task, context}; + let id = task::Id::next(); + let spawn_handle = context::spawn_handle().expect(CONTEXT_MISSING_ERROR); + let task = crate::util::trace::task(future, "task", name, id.as_u64()); + spawn_handle.spawn(task, id) + } } From fafc4feacfa0c526dd7ae4a14ff23bc14ba71db0 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 12 May 2022 11:41:41 -0700 Subject: [PATCH 4/4] fixup Signed-off-by: Eliza Weisman --- tokio/src/runtime/handle.rs | 2 +- tokio/src/task/builder.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 1e0e349c0f9..118d537d2f1 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -306,7 +306,7 @@ impl Handle { { let id = crate::runtime::task::Id::next(); #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = crate::util::trace::task(future, "task", name, id.as_u64()); + let future = crate::util::trace::task(future, "task", _name, id.as_u64()); self.spawner.spawn(future, id) } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 03751626099..ff3f5954b56 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -1,7 +1,7 @@ #![allow(unreachable_pub)] use crate::{ runtime::{context, Handle}, - task::JoinHandle, + task::{JoinHandle, LocalSet}, }; use std::future::Future; @@ -88,7 +88,7 @@ impl<'a> Builder<'a> { Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - super::spawn_inner(future, self.name) + super::spawn::spawn_inner(future, self.name) } /// Spawn a task with this builder's settings on the provided [runtime @@ -99,7 +99,7 @@ impl<'a> Builder<'a> { /// [runtime handle]: crate::runtime::Handle /// [`Handle::spawn`]: crate::runtime::Handle::spawn #[track_caller] - pub fn spawn_on(&mut self, task: F, handle: &Handle) -> JoinHandle + pub fn spawn_on(&mut self, future: Fut, handle: &Handle) -> JoinHandle where Fut: Future + Send + 'static, Fut::Output: Send + 'static, @@ -143,7 +143,7 @@ impl<'a> Builder<'a> { Fut: Future + 'static, Fut::Output: 'static, { - local_set.spawn_inner(future, self.name) + local_set.spawn_named(future, self.name) } /// Spawns blocking code on the blocking threadpool. @@ -160,7 +160,7 @@ impl<'a> Builder<'a> { Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, { - self.spawn_blocking_on(function, context::current()) + self.spawn_blocking_on(function, &context::current()) } /// Spawns blocking code on the provided [runtime handle]'s blocking threadpool. @@ -184,7 +184,7 @@ impl<'a> Builder<'a> { function, Mandatory::NonMandatory, self.name, - &handle, + handle, ); join_handle }