From f6cb6e084babd72b8e37a770d962d740495585eb Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Mon, 17 Apr 2023 04:20:27 -0400 Subject: [PATCH] task: add `JoinSet::spawn_blocking` (#5612) --- tokio/src/task/join_set.rs | 61 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index e6d8d62c3dc..041b06cb60f 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -201,6 +201,67 @@ impl JoinSet { self.insert(local_set.spawn_local(task)) } + /// Spawn the blocking code on the blocking threadpool and store + /// it in this `JoinSet`, returning an [`AbortHandle`] that can be + /// used to remotely cancel the task. + /// + /// # Examples + /// + /// Spawn multiple blocking tasks and wait for them. + /// + /// ``` + /// use tokio::task::JoinSet; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut set = JoinSet::new(); + /// + /// for i in 0..10 { + /// set.spawn_blocking(move || { i }); + /// } + /// + /// let mut seen = [false; 10]; + /// while let Some(res) = set.join_next().await { + /// let idx = res.unwrap(); + /// seen[idx] = true; + /// } + /// + /// for i in 0..10 { + /// assert!(seen[i]); + /// } + /// } + /// ``` + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + /// + /// [`AbortHandle`]: crate::task::AbortHandle + #[track_caller] + pub fn spawn_blocking(&mut self, f: F) -> AbortHandle + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send, + { + self.insert(crate::runtime::spawn_blocking(f)) + } + + /// Spawn the blocking code on the blocking threadpool of the + /// provided runtime and store it in this `JoinSet`, returning an + /// [`AbortHandle`] that can be used to remotely cancel the task. + /// + /// [`AbortHandle`]: crate::task::AbortHandle + #[track_caller] + pub fn spawn_blocking_on(&mut self, f: F, handle: &Handle) -> AbortHandle + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send, + { + self.insert(handle.spawn_blocking(f)) + } + fn insert(&mut self, jh: JoinHandle) -> AbortHandle { let abort = jh.abort_handle(); let mut entry = self.inner.insert_idle(jh);