From ec66a92b016914f2ead483bada32ffe696a2fd3a Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 26 Oct 2022 10:17:59 -0700 Subject: [PATCH] rt: signal driver now uses I/O driver directly (#5125) The signal driver uses a `UnixStream` to receive signal events. Previously, the signal driver used `PollEvented` internally to receive events on the `UnixStream`. However, using `PollEvented` from within a runtime driver created a circular link between the runtime and the `PollEvented` instance. This patch replaces `PollEvented` usage in favor of accessing the I/O driver directly. The I/O driver now reserves a token for signal-related events and tracks signal readiness internally. The signal driver queries the I/O driver to check for signal-related readiness. --- tokio/src/io/mod.rs | 6 ++++ tokio/src/io/poll_evented.rs | 6 +--- tokio/src/runtime/io/mod.rs | 29 +++++++++++++-- tokio/src/runtime/signal/mod.rs | 64 +++++++++++---------------------- tokio/tests/rt_metrics.rs | 8 ++--- 5 files changed, 58 insertions(+), 55 deletions(-) diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index e5c09a3a4ae..f48035ad934 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -178,6 +178,12 @@ //! [`Sink`]: https://docs.rs/futures/0.3/futures/sink/trait.Sink.html //! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html //! [`Write`]: std::io::Write + +#![cfg_attr( + not(all(feature = "rt", feature = "net")), + allow(dead_code, unused_imports) +)] + cfg_io_blocking! { pub(crate) mod blocking; } diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index bebabc23bcd..974fdd1a389 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -119,11 +119,7 @@ impl PollEvented { } /// Returns a reference to the registration. - #[cfg(any( - feature = "net", - all(unix, feature = "process"), - all(unix, feature = "signal"), - ))] + #[cfg(any(feature = "net"))] pub(crate) fn registration(&self) -> &Registration { &self.registration } diff --git a/tokio/src/runtime/io/mod.rs b/tokio/src/runtime/io/mod.rs index d264dc4798b..3f80005be9a 100644 --- a/tokio/src/runtime/io/mod.rs +++ b/tokio/src/runtime/io/mod.rs @@ -1,4 +1,4 @@ -#![cfg_attr(not(feature = "rt"), allow(dead_code))] +#![cfg_attr(not(all(feature = "rt", feature = "net")), allow(dead_code))] mod registration; pub(crate) use registration::Registration; @@ -26,6 +26,9 @@ pub(crate) struct Driver { /// as it is mostly used to determine when to call `compact()`. tick: u8, + /// True when an event with the signal token is received + signal_ready: bool, + /// Reuse the `mio::Events` value across calls to poll. events: mio::Events, @@ -86,6 +89,7 @@ enum Tick { // TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup // token. const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); +const TOKEN_SIGNAL: mio::Token = mio::Token(1 + (1 << 31)); const ADDRESS: bit::Pack = bit::Pack::least_significant(24); @@ -119,6 +123,7 @@ impl Driver { Ok(Driver { tick: 0, + signal_ready: false, events: mio::Events::with_capacity(1024), poll, resources: slab, @@ -193,7 +198,11 @@ impl Driver { for event in events.iter() { let token = event.token(); - if token != TOKEN_WAKEUP { + if token == TOKEN_WAKEUP { + // Nothing to do, the event is used to unblock the I/O driver + } else if token == TOKEN_SIGNAL { + self.signal_ready = true; + } else { Self::dispatch( &mut self.resources, self.tick, @@ -378,3 +387,19 @@ impl Direction { } } } + +// Signal handling +cfg_signal_internal_and_unix! { + impl Driver { + pub(crate) fn register_signal_receiver(&mut self, receiver: &mut mio::net::UnixStream) -> io::Result<()> { + self.inner.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?; + Ok(()) + } + + pub(crate) fn consume_signal_ready(&mut self) -> bool { + let ret = self.signal_ready; + self.signal_ready = false; + ret + } + } +} diff --git a/tokio/src/runtime/signal/mod.rs b/tokio/src/runtime/signal/mod.rs index ff2d0fb7463..9a110b5a9d3 100644 --- a/tokio/src/runtime/signal/mod.rs +++ b/tokio/src/runtime/signal/mod.rs @@ -2,16 +2,12 @@ //! Signal driver -use crate::io::interest::Interest; -use crate::io::PollEvented; use crate::runtime::io; use crate::signal::registry::globals; use mio::net::UnixStream; use std::io::{self as std_io, Read}; -use std::ptr; use std::sync::{Arc, Weak}; -use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use std::time::Duration; /// Responsible for registering wakeups when an OS signal is received, and @@ -22,10 +18,10 @@ use std::time::Duration; #[derive(Debug)] pub(crate) struct Driver { /// Thread parker. The `Driver` park implementation delegates to this. - park: io::Driver, + io: io::Driver, /// A pipe for receiving wake events from the signal handler - receiver: PollEvented, + receiver: UnixStream, /// Shared state inner: Arc, @@ -43,7 +39,7 @@ pub(super) struct Inner(()); impl Driver { /// Creates a new signal `Driver` instance that delegates wakeups to `park`. - pub(crate) fn new(park: io::Driver) -> std_io::Result { + pub(crate) fn new(mut io: io::Driver) -> std_io::Result { use std::mem::ManuallyDrop; use std::os::unix::io::{AsRawFd, FromRawFd}; @@ -55,12 +51,11 @@ impl Driver { // I'm not sure if the second (failed) registration simply doesn't end // up receiving wake up notifications, or there could be some race // condition when consuming readiness events, but having distinct - // descriptors for distinct PollEvented instances appears to mitigate - // this. + // descriptors appears to mitigate this. // - // Unfortunately we cannot just use a single global PollEvented instance - // either, since we can't compare Handles or assume they will always - // point to the exact same reactor. + // Unfortunately we cannot just use a single global UnixStream instance + // either, since we can't assume they will always be registered with the + // exact same reactor. // // Mio 0.7 removed `try_clone()` as an API due to unexpected behavior // with registering dups with the same reactor. In this case, duping is @@ -73,12 +68,12 @@ impl Driver { // safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe. let original = ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); - let receiver = UnixStream::from_std(original.try_clone()?); - let receiver = - PollEvented::new_with_interest_and_handle(receiver, Interest::READABLE, park.handle())?; + let mut receiver = UnixStream::from_std(original.try_clone()?); + + io.register_signal_receiver(&mut receiver)?; Ok(Self { - park, + io, receiver, inner: Arc::new(Inner(())), }) @@ -93,38 +88,31 @@ impl Driver { } pub(crate) fn park(&mut self) { - self.park.park(); + self.io.park(); self.process(); } pub(crate) fn park_timeout(&mut self, duration: Duration) { - self.park.park_timeout(duration); + self.io.park_timeout(duration); self.process(); } pub(crate) fn shutdown(&mut self) { - self.park.shutdown() + self.io.shutdown() } - fn process(&self) { - // Check if the pipe is ready to read and therefore has "woken" us up - // - // To do so, we will `poll_read_ready` with a noop waker, since we don't - // need to actually be notified when read ready... - let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) }; - let mut cx = Context::from_waker(&waker); - - let ev = match self.receiver.registration().poll_read_ready(&mut cx) { - Poll::Ready(Ok(ev)) => ev, - Poll::Ready(Err(e)) => panic!("reactor gone: {}", e), - Poll::Pending => return, // No wake has arrived, bail - }; + fn process(&mut self) { + // If the signal pipe has not recieved a readiness event, then there is + // nothing else to do. + if !self.io.consume_signal_ready() { + return; + } // Drain the pipe completely so we can receive a new readiness event // if another signal has come in. let mut buf = [0; 128]; loop { - match (&*self.receiver).read(&mut buf) { + match self.receiver.read(&mut buf) { Ok(0) => panic!("EOF on self-pipe"), Ok(_) => continue, // Keep reading Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break, @@ -132,21 +120,11 @@ impl Driver { } } - self.receiver.registration().clear_readiness(ev); - // Broadcast any signals which were received globals().broadcast(); } } -const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop); - -unsafe fn noop_clone(_data: *const ()) -> RawWaker { - RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE) -} - -unsafe fn noop(_data: *const ()) {} - // ===== impl Handle ===== impl Handle { diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 85db2961662..cffc117bce2 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -375,20 +375,18 @@ fn io_driver_fd_count() { let rt = current_thread(); let metrics = rt.metrics(); - // Since this is enabled w/ the process driver we always - // have 1 fd registered. - assert_eq!(metrics.io_driver_fd_registered_count(), 1); + assert_eq!(metrics.io_driver_fd_registered_count(), 0); let stream = tokio::net::TcpStream::connect("google.com:80"); let stream = rt.block_on(async move { stream.await.unwrap() }); - assert_eq!(metrics.io_driver_fd_registered_count(), 2); + assert_eq!(metrics.io_driver_fd_registered_count(), 1); assert_eq!(metrics.io_driver_fd_deregistered_count(), 0); drop(stream); assert_eq!(metrics.io_driver_fd_deregistered_count(), 1); - assert_eq!(metrics.io_driver_fd_registered_count(), 2); + assert_eq!(metrics.io_driver_fd_registered_count(), 1); } #[cfg(any(target_os = "linux", target_os = "macos"))]