From 58c457190b8a79b7ed8a76910e4d84d9d5de163d Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 26 Oct 2022 12:07:38 -0700 Subject: [PATCH] rt: start decoupling I/O driver and I/O handle (#5127) This is the start of applying a similar treatment to the I/O driver as the time driver. The I/O driver will no longer hold its own reference to the I/O handle. Instead, the handle is passed in when needed. This patch also moves the process driver to the `runtime` module. --- tokio/src/process/unix/mod.rs | 4 +- tokio/src/process/unix/orphan.rs | 3 +- tokio/src/runtime/driver.rs | 45 ++++++++-------- tokio/src/runtime/io/mod.rs | 51 +++++++++---------- tokio/src/runtime/mod.rs | 4 ++ .../unix/driver.rs => runtime/process.rs} | 13 ++--- tokio/src/runtime/signal/mod.rs | 18 +++---- 7 files changed, 72 insertions(+), 66 deletions(-) rename tokio/src/{process/unix/driver.rs => runtime/process.rs} (70%) diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs index 038a5927172..9bbc813fe6d 100644 --- a/tokio/src/process/unix/mod.rs +++ b/tokio/src/process/unix/mod.rs @@ -21,8 +21,6 @@ //! processes in general aren't scalable (e.g. millions) so it shouldn't be that //! bad in theory... -pub(crate) mod driver; - pub(crate) mod orphan; use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; @@ -90,7 +88,7 @@ impl fmt::Debug for GlobalOrphanQueue { } impl GlobalOrphanQueue { - fn reap_orphans(handle: &SignalHandle) { + pub(crate) fn reap_orphans(handle: &SignalHandle) { get_orphan_queue().reap_orphans(handle) } } diff --git a/tokio/src/process/unix/orphan.rs b/tokio/src/process/unix/orphan.rs index 5862b996d9e..66572ef7c41 100644 --- a/tokio/src/process/unix/orphan.rs +++ b/tokio/src/process/unix/orphan.rs @@ -294,7 +294,8 @@ pub(crate) mod test { #[cfg_attr(miri, ignore)] // Miri does not support epoll. #[test] fn does_not_register_signal_if_queue_empty() { - let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap(); + let (io_driver, io_handle) = IoDriver::new().unwrap(); + let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap(); let handle = signal_driver.handle(); let orphanage = OrphanQueueImpl::new(); diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index f676e036981..aeba46061a1 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -81,6 +81,14 @@ impl Handle { self.io.unpark(); } + cfg_io_driver! { + pub(crate) fn io(&self) -> &crate::runtime::io::Handle { + self.io + .as_ref() + .expect("A Tokio 1.x context was found, but I/O is disabled. Call `enable_io` on the runtime builder to enable I/O.") + } + } + cfg_time! { /// Returns a reference to the time driver handle. /// @@ -116,10 +124,9 @@ cfg_io_driver! { assert!(!enabled); let ret = if enabled { - let io_driver = crate::runtime::io::Driver::new()?; - let io_handle = io_driver.handle(); + let (io_driver, io_handle) = crate::runtime::io::Driver::new()?; - let (signal_driver, signal_handle) = create_signal_driver(io_driver)?; + let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?; let process_driver = create_process_driver(signal_driver); (IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle) @@ -133,23 +140,23 @@ cfg_io_driver! { } impl IoStack { - pub(crate) fn park(&mut self, _handle: &Handle) { + pub(crate) fn park(&mut self, handle: &Handle) { match self { - IoStack::Enabled(v) => v.park(), + IoStack::Enabled(v) => v.park(handle), IoStack::Disabled(v) => v.park(), } } - pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) { + pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { match self { - IoStack::Enabled(v) => v.park_timeout(duration), + IoStack::Enabled(v) => v.park_timeout(handle, duration), IoStack::Disabled(v) => v.park_timeout(duration), } } - pub(crate) fn shutdown(&mut self, _handle: &Handle) { + pub(crate) fn shutdown(&mut self, handle: &Handle) { match self { - IoStack::Enabled(v) => v.shutdown(), + IoStack::Enabled(v) => v.shutdown(handle), IoStack::Disabled(v) => v.shutdown(), } } @@ -171,12 +178,10 @@ cfg_io_driver! { } } - cfg_unstable! { - pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> { - match self { - IoHandle::Enabled(v) => Some(v), - IoHandle::Disabled(..) => None, - } + pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> { + match self { + IoHandle::Enabled(v) => Some(v), + IoHandle::Disabled(..) => None, } } } @@ -215,8 +220,8 @@ cfg_signal_internal_and_unix! { type SignalDriver = crate::runtime::signal::Driver; pub(crate) type SignalHandle = Option; - fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { - let driver = crate::runtime::signal::Driver::new(io_driver)?; + fn create_signal_driver(io_driver: IoDriver, io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> { + let driver = crate::runtime::signal::Driver::new(io_driver, io_handle)?; let handle = driver.handle(); Ok((driver, Some(handle))) } @@ -228,7 +233,7 @@ cfg_not_signal_internal! { cfg_io_driver! { type SignalDriver = IoDriver; - fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { + fn create_signal_driver(io_driver: IoDriver, _io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> { Ok((io_driver, ())) } } @@ -237,10 +242,10 @@ cfg_not_signal_internal! { // ===== process driver ===== cfg_process_driver! { - type ProcessDriver = crate::process::unix::driver::Driver; + type ProcessDriver = crate::runtime::process::Driver; fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver { - crate::process::unix::driver::Driver::new(signal_driver) + ProcessDriver::new(signal_driver) } } diff --git a/tokio/src/runtime/io/mod.rs b/tokio/src/runtime/io/mod.rs index 3f80005be9a..6ba48a717c2 100644 --- a/tokio/src/runtime/io/mod.rs +++ b/tokio/src/runtime/io/mod.rs @@ -10,6 +10,7 @@ mod metrics; use crate::io::interest::Interest; use crate::io::ready::Ready; +use crate::runtime::driver; use crate::util::slab::{self, Slab}; use crate::{loom::sync::RwLock, util::bit}; @@ -38,9 +39,6 @@ pub(crate) struct Driver { /// The system event queue. poll: mio::Poll, - - /// State shared between the reactor and the handles. - inner: Arc, } /// A reference to an I/O driver. @@ -112,7 +110,7 @@ fn _assert_kinds() { impl Driver { /// Creates a new event loop, returning any error that happened during the /// creation. - pub(crate) fn new() -> io::Result { + pub(crate) fn new() -> io::Result<(Driver, Handle)> { let poll = mio::Poll::new()?; #[cfg(not(tokio_wasi))] let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; @@ -121,12 +119,15 @@ impl Driver { let slab = Slab::new(); let allocator = slab.allocator(); - Ok(Driver { + let driver = Driver { tick: 0, signal_ready: false, events: mio::Events::with_capacity(1024), poll, resources: slab, + }; + + let handle = Handle { inner: Arc::new(Inner { registry, io_dispatch: RwLock::new(IoDispatcher::new(allocator)), @@ -134,31 +135,25 @@ impl Driver { waker, metrics: IoDriverMetrics::default(), }), - }) - } + }; - /// Returns a handle to this event loop which can be sent across threads - /// and can be used as a proxy to the event loop itself. - /// - /// Handles are cloneable and clones always refer to the same event loop. - /// This handle is typically passed into functions that create I/O objects - /// to bind them to this event loop. - pub(crate) fn handle(&self) -> Handle { - Handle { - inner: Arc::clone(&self.inner), - } + Ok((driver, handle)) } - pub(crate) fn park(&mut self) { - self.turn(None); + pub(crate) fn park(&mut self, rt_handle: &driver::Handle) { + let handle = rt_handle.io(); + self.turn(handle, None); } - pub(crate) fn park_timeout(&mut self, duration: Duration) { - self.turn(Some(duration)); + pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { + let handle = rt_handle.io(); + self.turn(handle, Some(duration)); } - pub(crate) fn shutdown(&mut self) { - if self.inner.shutdown() { + pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { + let handle = rt_handle.io(); + + if handle.inner.shutdown() { self.resources.for_each(|io| { // If a task is waiting on the I/O resource, notify it. The task // will then attempt to use the I/O resource and fail due to the @@ -168,7 +163,7 @@ impl Driver { } } - fn turn(&mut self, max_wait: Option) { + fn turn(&mut self, handle: &Handle, max_wait: Option) { // How often to call `compact()` on the resource slab const COMPACT_INTERVAL: u8 = 255; @@ -213,7 +208,7 @@ impl Driver { } } - self.inner.metrics.incr_ready_count_by(ready_count); + handle.inner.metrics.incr_ready_count_by(ready_count); } fn dispatch(resources: &mut Slab, tick: u8, token: mio::Token, ready: Ready) { @@ -390,12 +385,14 @@ impl Direction { // Signal handling cfg_signal_internal_and_unix! { - impl Driver { - pub(crate) fn register_signal_receiver(&mut self, receiver: &mut mio::net::UnixStream) -> io::Result<()> { + impl Handle { + pub(crate) fn register_signal_receiver(&self, receiver: &mut mio::net::UnixStream) -> io::Result<()> { self.inner.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?; Ok(()) } + } + impl Driver { pub(crate) fn consume_signal_ready(&mut self) -> bool { let ret = self.signal_ready; self.signal_ready = false; diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 56c4e32eb6b..b69d696cdef 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -184,6 +184,10 @@ cfg_io_driver_impl! { pub(crate) mod io; } +cfg_process_driver! { + mod process; +} + cfg_time! { pub(crate) mod time; } diff --git a/tokio/src/process/unix/driver.rs b/tokio/src/runtime/process.rs similarity index 70% rename from tokio/src/process/unix/driver.rs rename to tokio/src/runtime/process.rs index b6949286a87..df339b0e729 100644 --- a/tokio/src/process/unix/driver.rs +++ b/tokio/src/runtime/process.rs @@ -3,6 +3,7 @@ //! Process driver. use crate::process::unix::GlobalOrphanQueue; +use crate::runtime::driver; use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle}; use std::time::Duration; @@ -27,17 +28,17 @@ impl Driver { } } - pub(crate) fn park(&mut self) { - self.park.park(); + pub(crate) fn park(&mut self, handle: &driver::Handle) { + self.park.park(handle); GlobalOrphanQueue::reap_orphans(&self.signal_handle); } - pub(crate) fn park_timeout(&mut self, duration: Duration) { - self.park.park_timeout(duration); + pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { + self.park.park_timeout(handle, duration); GlobalOrphanQueue::reap_orphans(&self.signal_handle); } - pub(crate) fn shutdown(&mut self) { - self.park.shutdown() + pub(crate) fn shutdown(&mut self, handle: &driver::Handle) { + self.park.shutdown(handle) } } diff --git a/tokio/src/runtime/signal/mod.rs b/tokio/src/runtime/signal/mod.rs index 9a110b5a9d3..c8ba951c6ed 100644 --- a/tokio/src/runtime/signal/mod.rs +++ b/tokio/src/runtime/signal/mod.rs @@ -2,7 +2,7 @@ //! Signal driver -use crate::runtime::io; +use crate::runtime::{driver, io}; use crate::signal::registry::globals; use mio::net::UnixStream; @@ -39,7 +39,7 @@ pub(super) struct Inner(()); impl Driver { /// Creates a new signal `Driver` instance that delegates wakeups to `park`. - pub(crate) fn new(mut io: io::Driver) -> std_io::Result { + pub(crate) fn new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result { use std::mem::ManuallyDrop; use std::os::unix::io::{AsRawFd, FromRawFd}; @@ -70,7 +70,7 @@ impl Driver { ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); let mut receiver = UnixStream::from_std(original.try_clone()?); - io.register_signal_receiver(&mut receiver)?; + io_handle.register_signal_receiver(&mut receiver)?; Ok(Self { io, @@ -87,18 +87,18 @@ impl Driver { } } - pub(crate) fn park(&mut self) { - self.io.park(); + pub(crate) fn park(&mut self, handle: &driver::Handle) { + self.io.park(handle); self.process(); } - pub(crate) fn park_timeout(&mut self, duration: Duration) { - self.io.park_timeout(duration); + pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { + self.io.park_timeout(handle, duration); self.process(); } - pub(crate) fn shutdown(&mut self) { - self.io.shutdown() + pub(crate) fn shutdown(&mut self, handle: &driver::Handle) { + self.io.shutdown(handle) } fn process(&mut self) {