diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 9d4a35e5e48..118d537d2f1 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -175,10 +175,7 @@ impl Handle { F: Future + Send + 'static, F::Output: Send + 'static, { - let id = crate::runtime::task::Id::next(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = crate::util::trace::task(future, "task", None, id.as_u64()); - self.spawner.spawn(future, id) + self.spawn_named(future, None) } /// Runs the provided function on an executor dedicated to blocking. @@ -301,6 +298,18 @@ impl Handle { .expect("failed to park thread") } + #[track_caller] + pub(crate) fn spawn_named(&self, future: F, _name: Option<&str>) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let id = crate::runtime::task::Id::next(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let future = crate::util::trace::task(future, "task", _name, id.as_u64()); + self.spawner.spawn(future, id) + } + pub(crate) fn shutdown(mut self) { self.spawner.shutdown(); } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 976ecc3c4b0..ff3f5954b56 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -1,5 +1,8 @@ #![allow(unreachable_pub)] -use crate::{runtime::context, task::JoinHandle}; +use crate::{ + runtime::{context, Handle}, + task::{JoinHandle, LocalSet}, +}; use std::future::Future; /// Factory which is used to configure the properties of a new task. @@ -71,7 +74,11 @@ impl<'a> Builder<'a> { Self { name: Some(name) } } - /// Spawns a task on the executor. + /// Spawns a task with this builder's settings on the current runtime. + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. /// /// See [`task::spawn`](crate::task::spawn) for /// more details. @@ -84,10 +91,36 @@ impl<'a> Builder<'a> { super::spawn::spawn_inner(future, self.name) } - /// Spawns a task on the current thread. + /// Spawn a task with this builder's settings on the provided [runtime + /// handle]. /// - /// See [`task::spawn_local`](crate::task::spawn_local) - /// for more details. + /// See [`Handle::spawn`] for more details. + /// + /// [runtime handle]: crate::runtime::Handle + /// [`Handle::spawn`]: crate::runtime::Handle::spawn + #[track_caller] + pub fn spawn_on(&mut self, future: Fut, handle: &Handle) -> JoinHandle + where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + { + handle.spawn_named(future, self.name) + } + + /// Spawns `!Send` a task on the current [`LocalSet`] with this builder's + /// settings. + /// + /// The spawned future will be run on the same thread that called `spawn_local.` + /// This may only be called from the context of a [local task set][`LocalSet`]. + /// + /// # Panics + /// + /// This function panics if called outside of a [local task set][`LocalSet`]. + /// + /// See [`task::spawn_local`] for more details. + /// + /// [`task::spawn_local`]: crate::task::spawn_local + /// [`LocalSet`]: crate::task::LocalSet #[track_caller] pub fn spawn_local(self, future: Fut) -> JoinHandle where @@ -97,23 +130,61 @@ impl<'a> Builder<'a> { super::local::spawn_local_inner(future, self.name) } + /// Spawns `!Send` a task on the provided [`LocalSet`] with this builder's + /// settings. + /// + /// See [`LocalSet::spawn_local`] for more details. + /// + /// [`LocalSet::spawn_local`]: crate::task::LocalSet::spawn_local + /// [`LocalSet`]: crate::task::LocalSet + #[track_caller] + pub fn spawn_local_on(self, future: Fut, local_set: &LocalSet) -> JoinHandle + where + Fut: Future + 'static, + Fut::Output: 'static, + { + local_set.spawn_named(future, self.name) + } + /// Spawns blocking code on the blocking threadpool. /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + /// /// See [`task::spawn_blocking`](crate::task::spawn_blocking) /// for more details. #[track_caller] pub fn spawn_blocking(self, function: Function) -> JoinHandle + where + Function: FnOnce() -> Output + Send + 'static, + Output: Send + 'static, + { + self.spawn_blocking_on(function, &context::current()) + } + + /// Spawns blocking code on the provided [runtime handle]'s blocking threadpool. + /// + /// See [`Handle::spawn_blocking`] for more details. + /// + /// [runtime handle]: crate::runtime::Handle + /// [`Handle::spawn_blocking`]: crate::runtime::Handle::spawn_blocking + #[track_caller] + pub fn spawn_blocking_on( + self, + function: Function, + handle: &Handle, + ) -> JoinHandle where Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, { use crate::runtime::Mandatory; - let handle = context::current(); let (join_handle, _was_spawned) = handle.as_inner().spawn_blocking_inner( function, Mandatory::NonMandatory, self.name, - &handle, + handle, ); join_handle } diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 32e376872f4..c3d874b6138 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -301,19 +301,11 @@ cfg_rt! { where F: Future + 'static, F::Output: 'static { - let id = crate::runtime::task::Id::next(); - let future = crate::util::trace::task(future, "local", name, id.as_u64()); CURRENT.with(|maybe_cx| { let cx = maybe_cx .expect("`spawn_local` called from outside of a `task::LocalSet`"); - let (handle, notified) = cx.owned.bind(future, cx.shared.clone(), id); - - if let Some(notified) = notified { - cx.shared.schedule(notified); - } - - handle + cx.spawn(future, name) }) } } @@ -386,20 +378,7 @@ impl LocalSet { F: Future + 'static, F::Output: 'static, { - let id = crate::runtime::task::Id::next(); - let future = crate::util::trace::task(future, "local", None, id.as_u64()); - - let (handle, notified) = self - .context - .owned - .bind(future, self.context.shared.clone(), id); - - if let Some(notified) = notified { - self.context.shared.schedule(notified); - } - - self.context.shared.waker.wake(); - handle + self.spawn_named(future, None) } /// Runs a future to completion on the provided runtime, driving any local @@ -512,6 +491,21 @@ impl LocalSet { run_until.await } + pub(in crate::task) fn spawn_named( + &self, + future: F, + name: Option<&str>, + ) -> JoinHandle + where + F: Future + 'static, + F::Output: 'static, + { + let handle = self.context.spawn(future, name); + + self.context.shared.waker.wake(); + handle + } + /// Ticks the scheduler, returning whether the local future needs to be /// notified again. fn tick(&self) -> bool { @@ -628,6 +622,28 @@ impl Drop for LocalSet { } } +// === impl Context === + +impl Context { + #[track_caller] + fn spawn(&self, future: F, name: Option<&str>) -> JoinHandle + where + F: Future + 'static, + F::Output: 'static, + { + let id = crate::runtime::task::Id::next(); + let future = crate::util::trace::task(future, "local", name, id.as_u64()); + + let (handle, notified) = self.owned.bind(future, self.shared.clone(), id); + + if let Some(notified) = notified { + self.shared.schedule(notified); + } + + handle + } +} + // === impl LocalFuture === impl Future for RunUntil<'_, T> {