From 2e0e0213c5074a2c6dca73f07731788ff61d6b87 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 14 Oct 2022 14:04:48 -0700 Subject: [PATCH] rt: remove a reference to internal time handle This patch removes a handle to the internal runtime driver handle held by the runtime. This is another step towards reducing the number of Arc refs across the runtime internals. Specifically, this change is part of an effort to remove an Arc in the time driver itself. --- tokio/src/runtime/driver.rs | 48 ++++++++++--------- tokio/src/runtime/scheduler/current_thread.rs | 6 +-- .../runtime/scheduler/multi_thread/park.rs | 24 +++++----- .../runtime/scheduler/multi_thread/worker.rs | 10 ++-- 4 files changed, 45 insertions(+), 43 deletions(-) diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 74d9009240a..18dd2bf8db1 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -58,16 +58,16 @@ impl Driver { )) } - pub(crate) fn park(&mut self) { - self.inner.park() + pub(crate) fn park(&mut self, handle: &Handle) { + self.inner.park(handle) } - pub(crate) fn park_timeout(&mut self, duration: Duration) { - self.inner.park_timeout(duration) + pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { + self.inner.park_timeout(handle, duration) } - pub(crate) fn shutdown(&mut self) { - self.inner.shutdown() + pub(crate) fn shutdown(&mut self, handle: &Handle) { + self.inner.shutdown(handle) } } @@ -121,15 +121,6 @@ cfg_io_driver! { } impl IoStack { - /* - pub(crate) fn handle(&self) -> IoHandle { - match self { - IoStack::Enabled(v) => IoHandle::Enabled(v.handle()), - IoStack::Disabled(v) => IoHandle::Disabled(v.unpark()), - } - }] - */ - pub(crate) fn park(&mut self) { match self { IoStack::Enabled(v) => v.park(), @@ -249,7 +240,6 @@ cfg_time! { pub(crate) enum TimeDriver { Enabled { driver: crate::runtime::time::Driver, - handle: crate::runtime::time::Handle, }, Disabled(IoStack), } @@ -269,30 +259,42 @@ cfg_time! { if enable { let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock); - (TimeDriver::Enabled { driver, handle: handle.clone() }, Some(handle)) + (TimeDriver::Enabled { driver }, Some(handle)) } else { (TimeDriver::Disabled(io_stack), None) } } impl TimeDriver { - pub(crate) fn park(&mut self) { + pub(crate) fn park(&mut self, handle: &Handle) { match self { - TimeDriver::Enabled { driver, handle } => driver.park(handle), + TimeDriver::Enabled { driver, .. } => { + // If the time driver is enabled, a handle is set. + let handle = handle.time.as_ref().unwrap(); + driver.park(handle) + } TimeDriver::Disabled(v) => v.park(), } } - pub(crate) fn park_timeout(&mut self, duration: Duration) { + pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { match self { - TimeDriver::Enabled { driver, handle } => driver.park_timeout(handle, duration), + TimeDriver::Enabled { driver } => { + // If the time driver is enabled, a handle is set. + let handle = handle.time.as_ref().unwrap(); + driver.park_timeout(handle, duration) + } TimeDriver::Disabled(v) => v.park_timeout(duration), } } - pub(crate) fn shutdown(&mut self) { + pub(crate) fn shutdown(&mut self, handle: &Handle) { match self { - TimeDriver::Enabled { driver, handle } => driver.shutdown(handle), + TimeDriver::Enabled { driver } => { + // If the time driver is enabled, a handle is set. + let handle = handle.time.as_ref().unwrap(); + driver.shutdown(handle) + } TimeDriver::Disabled(v) => v.shutdown(), } } diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 666be6b13f2..d11d93253ad 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -247,7 +247,7 @@ impl Drop for CurrentThread { // Shutdown the resource drivers if let Some(driver) = core.driver.as_mut() { - driver.shutdown(); + driver.shutdown(&self.handle.driver); } (core, ()) @@ -314,7 +314,7 @@ impl Context { core.metrics.submit(&self.handle.shared.worker_metrics); let (c, _) = self.enter(core, || { - driver.park(); + driver.park(&self.handle.driver); }); core = c; @@ -339,7 +339,7 @@ impl Context { core.metrics.submit(&self.handle.shared.worker_metrics); let (mut core, _) = self.enter(core, || { - driver.park_timeout(Duration::from_millis(0)); + driver.park_timeout(&self.handle.driver, Duration::from_millis(0)); }); core.driver = Some(driver); diff --git a/tokio/src/runtime/scheduler/multi_thread/park.rs b/tokio/src/runtime/scheduler/multi_thread/park.rs index 46432f4f036..6bdbff961e3 100644 --- a/tokio/src/runtime/scheduler/multi_thread/park.rs +++ b/tokio/src/runtime/scheduler/multi_thread/park.rs @@ -64,21 +64,21 @@ impl Parker { } } - pub(crate) fn park(&mut self) { - self.inner.park(); + pub(crate) fn park(&mut self, handle: &driver::Handle) { + self.inner.park(handle); } - pub(crate) fn park_timeout(&mut self, duration: Duration) { + pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { // Only parking with zero is supported... assert_eq!(duration, Duration::from_millis(0)); if let Some(mut driver) = self.inner.shared.driver.try_lock() { - driver.park_timeout(duration) + driver.park_timeout(handle, duration) } } - pub(crate) fn shutdown(&mut self) { - self.inner.shutdown(); + pub(crate) fn shutdown(&mut self, handle: &driver::Handle) { + self.inner.shutdown(handle); } } @@ -103,7 +103,7 @@ impl Unparker { impl Inner { /// Parks the current thread for at most `dur`. - fn park(&self) { + fn park(&self, handle: &driver::Handle) { for _ in 0..3 { // If we were previously notified then we consume this notification and // return quickly. @@ -119,7 +119,7 @@ impl Inner { } if let Some(mut driver) = self.shared.driver.try_lock() { - self.park_driver(&mut driver); + self.park_driver(&mut driver, handle); } else { self.park_condvar(); } @@ -165,7 +165,7 @@ impl Inner { } } - fn park_driver(&self, driver: &mut Driver) { + fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) { match self .state .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) @@ -186,7 +186,7 @@ impl Inner { Err(actual) => panic!("inconsistent park state; actual = {}", actual), } - driver.park(); + driver.park(handle); match self.state.swap(EMPTY, SeqCst) { NOTIFIED => {} // got a notification, hurray! @@ -227,9 +227,9 @@ impl Inner { self.condvar.notify_one() } - fn shutdown(&self) { + fn shutdown(&self, handle: &driver::Handle) { if let Some(mut driver) = self.shared.driver.try_lock() { - driver.shutdown(); + driver.shutdown(handle); } self.condvar.notify_all(); diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 34ef0d9f126..7a7e676dcc9 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -522,9 +522,9 @@ impl Context { // Park thread if let Some(timeout) = duration { - park.park_timeout(timeout); + park.park_timeout(&self.worker.handle.driver, timeout); } else { - park.park(); + park.park(&self.worker.handle.driver); } // Remove `core` from context @@ -687,14 +687,14 @@ impl Core { } /// Shuts down the core. - fn shutdown(&mut self) { + fn shutdown(&mut self, handle: &Handle) { // Take the core let mut park = self.park.take().expect("park missing"); // Drain the queue while self.next_local_task().is_some() {} - park.shutdown(); + park.shutdown(&handle.driver); } } @@ -829,7 +829,7 @@ impl Handle { debug_assert!(self.shared.owned.is_empty()); for mut core in cores.drain(..) { - core.shutdown(); + core.shutdown(self); } // Drain the injection queue