diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 74d9009240a..18dd2bf8db1 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -58,16 +58,16 @@ impl Driver { )) } - pub(crate) fn park(&mut self) { - self.inner.park() + pub(crate) fn park(&mut self, handle: &Handle) { + self.inner.park(handle) } - pub(crate) fn park_timeout(&mut self, duration: Duration) { - self.inner.park_timeout(duration) + pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { + self.inner.park_timeout(handle, duration) } - pub(crate) fn shutdown(&mut self) { - self.inner.shutdown() + pub(crate) fn shutdown(&mut self, handle: &Handle) { + self.inner.shutdown(handle) } } @@ -121,15 +121,6 @@ cfg_io_driver! { } impl IoStack { - /* - pub(crate) fn handle(&self) -> IoHandle { - match self { - IoStack::Enabled(v) => IoHandle::Enabled(v.handle()), - IoStack::Disabled(v) => IoHandle::Disabled(v.unpark()), - } - }] - */ - pub(crate) fn park(&mut self) { match self { IoStack::Enabled(v) => v.park(), @@ -249,7 +240,6 @@ cfg_time! { pub(crate) enum TimeDriver { Enabled { driver: crate::runtime::time::Driver, - handle: crate::runtime::time::Handle, }, Disabled(IoStack), } @@ -269,30 +259,42 @@ cfg_time! { if enable { let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock); - (TimeDriver::Enabled { driver, handle: handle.clone() }, Some(handle)) + (TimeDriver::Enabled { driver }, Some(handle)) } else { (TimeDriver::Disabled(io_stack), None) } } impl TimeDriver { - pub(crate) fn park(&mut self) { + pub(crate) fn park(&mut self, handle: &Handle) { match self { - TimeDriver::Enabled { driver, handle } => driver.park(handle), + TimeDriver::Enabled { driver, .. } => { + // If the time driver is enabled, a handle is set. + let handle = handle.time.as_ref().unwrap(); + driver.park(handle) + } TimeDriver::Disabled(v) => v.park(), } } - pub(crate) fn park_timeout(&mut self, duration: Duration) { + pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { match self { - TimeDriver::Enabled { driver, handle } => driver.park_timeout(handle, duration), + TimeDriver::Enabled { driver } => { + // If the time driver is enabled, a handle is set. + let handle = handle.time.as_ref().unwrap(); + driver.park_timeout(handle, duration) + } TimeDriver::Disabled(v) => v.park_timeout(duration), } } - pub(crate) fn shutdown(&mut self) { + pub(crate) fn shutdown(&mut self, handle: &Handle) { match self { - TimeDriver::Enabled { driver, handle } => driver.shutdown(handle), + TimeDriver::Enabled { driver } => { + // If the time driver is enabled, a handle is set. + let handle = handle.time.as_ref().unwrap(); + driver.shutdown(handle) + } TimeDriver::Disabled(v) => v.shutdown(), } } diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 666be6b13f2..d11d93253ad 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -247,7 +247,7 @@ impl Drop for CurrentThread { // Shutdown the resource drivers if let Some(driver) = core.driver.as_mut() { - driver.shutdown(); + driver.shutdown(&self.handle.driver); } (core, ()) @@ -314,7 +314,7 @@ impl Context { core.metrics.submit(&self.handle.shared.worker_metrics); let (c, _) = self.enter(core, || { - driver.park(); + driver.park(&self.handle.driver); }); core = c; @@ -339,7 +339,7 @@ impl Context { core.metrics.submit(&self.handle.shared.worker_metrics); let (mut core, _) = self.enter(core, || { - driver.park_timeout(Duration::from_millis(0)); + driver.park_timeout(&self.handle.driver, Duration::from_millis(0)); }); core.driver = Some(driver); diff --git a/tokio/src/runtime/scheduler/multi_thread/park.rs b/tokio/src/runtime/scheduler/multi_thread/park.rs index 46432f4f036..6bdbff961e3 100644 --- a/tokio/src/runtime/scheduler/multi_thread/park.rs +++ b/tokio/src/runtime/scheduler/multi_thread/park.rs @@ -64,21 +64,21 @@ impl Parker { } } - pub(crate) fn park(&mut self) { - self.inner.park(); + pub(crate) fn park(&mut self, handle: &driver::Handle) { + self.inner.park(handle); } - pub(crate) fn park_timeout(&mut self, duration: Duration) { + pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { // Only parking with zero is supported... assert_eq!(duration, Duration::from_millis(0)); if let Some(mut driver) = self.inner.shared.driver.try_lock() { - driver.park_timeout(duration) + driver.park_timeout(handle, duration) } } - pub(crate) fn shutdown(&mut self) { - self.inner.shutdown(); + pub(crate) fn shutdown(&mut self, handle: &driver::Handle) { + self.inner.shutdown(handle); } } @@ -103,7 +103,7 @@ impl Unparker { impl Inner { /// Parks the current thread for at most `dur`. - fn park(&self) { + fn park(&self, handle: &driver::Handle) { for _ in 0..3 { // If we were previously notified then we consume this notification and // return quickly. @@ -119,7 +119,7 @@ impl Inner { } if let Some(mut driver) = self.shared.driver.try_lock() { - self.park_driver(&mut driver); + self.park_driver(&mut driver, handle); } else { self.park_condvar(); } @@ -165,7 +165,7 @@ impl Inner { } } - fn park_driver(&self, driver: &mut Driver) { + fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) { match self .state .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) @@ -186,7 +186,7 @@ impl Inner { Err(actual) => panic!("inconsistent park state; actual = {}", actual), } - driver.park(); + driver.park(handle); match self.state.swap(EMPTY, SeqCst) { NOTIFIED => {} // got a notification, hurray! @@ -227,9 +227,9 @@ impl Inner { self.condvar.notify_one() } - fn shutdown(&self) { + fn shutdown(&self, handle: &driver::Handle) { if let Some(mut driver) = self.shared.driver.try_lock() { - driver.shutdown(); + driver.shutdown(handle); } self.condvar.notify_all(); diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 34ef0d9f126..7a7e676dcc9 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -522,9 +522,9 @@ impl Context { // Park thread if let Some(timeout) = duration { - park.park_timeout(timeout); + park.park_timeout(&self.worker.handle.driver, timeout); } else { - park.park(); + park.park(&self.worker.handle.driver); } // Remove `core` from context @@ -687,14 +687,14 @@ impl Core { } /// Shuts down the core. - fn shutdown(&mut self) { + fn shutdown(&mut self, handle: &Handle) { // Take the core let mut park = self.park.take().expect("park missing"); // Drain the queue while self.next_local_task().is_some() {} - park.shutdown(); + park.shutdown(&handle.driver); } } @@ -829,7 +829,7 @@ impl Handle { debug_assert!(self.shared.owned.is_empty()); for mut core in cores.drain(..) { - core.shutdown(); + core.shutdown(self); } // Drain the injection queue