Skip to content

Commit

Permalink
rt: signal driver now uses I/O driver directly (#5125)
Browse files Browse the repository at this point in the history
The signal driver uses a `UnixStream` to receive signal events.
Previously, the signal driver used `PollEvented` internally to receive
events on the `UnixStream`. However, using `PollEvented` from
within a runtime driver created a circular link between the runtime and
the `PollEvented` instance.

This patch replaces `PollEvented` usage in favor of accessing the I/O
driver directly. The I/O driver now reserves a token for signal-related
events and tracks signal readiness internally. The signal driver queries
the I/O driver to check for signal-related readiness.
  • Loading branch information
carllerche committed Oct 26, 2022
1 parent 1ca17be commit ec66a92
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 55 deletions.
6 changes: 6 additions & 0 deletions tokio/src/io/mod.rs
Expand Up @@ -178,6 +178,12 @@
//! [`Sink`]: https://docs.rs/futures/0.3/futures/sink/trait.Sink.html
//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
//! [`Write`]: std::io::Write

#![cfg_attr(
not(all(feature = "rt", feature = "net")),
allow(dead_code, unused_imports)
)]

cfg_io_blocking! {
pub(crate) mod blocking;
}
Expand Down
6 changes: 1 addition & 5 deletions tokio/src/io/poll_evented.rs
Expand Up @@ -119,11 +119,7 @@ impl<E: Source> PollEvented<E> {
}

/// Returns a reference to the registration.
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
#[cfg(any(feature = "net"))]
pub(crate) fn registration(&self) -> &Registration {
&self.registration
}
Expand Down
29 changes: 27 additions & 2 deletions tokio/src/runtime/io/mod.rs
@@ -1,4 +1,4 @@
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
#![cfg_attr(not(all(feature = "rt", feature = "net")), allow(dead_code))]

mod registration;
pub(crate) use registration::Registration;
Expand Down Expand Up @@ -26,6 +26,9 @@ pub(crate) struct Driver {
/// as it is mostly used to determine when to call `compact()`.
tick: u8,

/// True when an event with the signal token is received
signal_ready: bool,

/// Reuse the `mio::Events` value across calls to poll.
events: mio::Events,

Expand Down Expand Up @@ -86,6 +89,7 @@ enum Tick {
// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
const TOKEN_SIGNAL: mio::Token = mio::Token(1 + (1 << 31));

const ADDRESS: bit::Pack = bit::Pack::least_significant(24);

Expand Down Expand Up @@ -119,6 +123,7 @@ impl Driver {

Ok(Driver {
tick: 0,
signal_ready: false,
events: mio::Events::with_capacity(1024),
poll,
resources: slab,
Expand Down Expand Up @@ -193,7 +198,11 @@ impl Driver {
for event in events.iter() {
let token = event.token();

if token != TOKEN_WAKEUP {
if token == TOKEN_WAKEUP {
// Nothing to do, the event is used to unblock the I/O driver
} else if token == TOKEN_SIGNAL {
self.signal_ready = true;
} else {
Self::dispatch(
&mut self.resources,
self.tick,
Expand Down Expand Up @@ -378,3 +387,19 @@ impl Direction {
}
}
}

// Signal handling
cfg_signal_internal_and_unix! {
impl Driver {
pub(crate) fn register_signal_receiver(&mut self, receiver: &mut mio::net::UnixStream) -> io::Result<()> {
self.inner.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
Ok(())
}

pub(crate) fn consume_signal_ready(&mut self) -> bool {
let ret = self.signal_ready;
self.signal_ready = false;
ret
}
}
}
64 changes: 21 additions & 43 deletions tokio/src/runtime/signal/mod.rs
Expand Up @@ -2,16 +2,12 @@

//! Signal driver

use crate::io::interest::Interest;
use crate::io::PollEvented;
use crate::runtime::io;
use crate::signal::registry::globals;

use mio::net::UnixStream;
use std::io::{self as std_io, Read};
use std::ptr;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;

/// Responsible for registering wakeups when an OS signal is received, and
Expand All @@ -22,10 +18,10 @@ use std::time::Duration;
#[derive(Debug)]
pub(crate) struct Driver {
/// Thread parker. The `Driver` park implementation delegates to this.
park: io::Driver,
io: io::Driver,

/// A pipe for receiving wake events from the signal handler
receiver: PollEvented<UnixStream>,
receiver: UnixStream,

/// Shared state
inner: Arc<Inner>,
Expand All @@ -43,7 +39,7 @@ pub(super) struct Inner(());

impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(park: io::Driver) -> std_io::Result<Self> {
pub(crate) fn new(mut io: io::Driver) -> std_io::Result<Self> {
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};

Expand All @@ -55,12 +51,11 @@ impl Driver {
// I'm not sure if the second (failed) registration simply doesn't end
// up receiving wake up notifications, or there could be some race
// condition when consuming readiness events, but having distinct
// descriptors for distinct PollEvented instances appears to mitigate
// this.
// descriptors appears to mitigate this.
//
// Unfortunately we cannot just use a single global PollEvented instance
// either, since we can't compare Handles or assume they will always
// point to the exact same reactor.
// Unfortunately we cannot just use a single global UnixStream instance
// either, since we can't assume they will always be registered with the
// exact same reactor.
//
// Mio 0.7 removed `try_clone()` as an API due to unexpected behavior
// with registering dups with the same reactor. In this case, duping is
Expand All @@ -73,12 +68,12 @@ impl Driver {
// safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe.
let original =
ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
let receiver = UnixStream::from_std(original.try_clone()?);
let receiver =
PollEvented::new_with_interest_and_handle(receiver, Interest::READABLE, park.handle())?;
let mut receiver = UnixStream::from_std(original.try_clone()?);

io.register_signal_receiver(&mut receiver)?;

Ok(Self {
park,
io,
receiver,
inner: Arc::new(Inner(())),
})
Expand All @@ -93,60 +88,43 @@ impl Driver {
}

pub(crate) fn park(&mut self) {
self.park.park();
self.io.park();
self.process();
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.park.park_timeout(duration);
self.io.park_timeout(duration);
self.process();
}

pub(crate) fn shutdown(&mut self) {
self.park.shutdown()
self.io.shutdown()
}

fn process(&self) {
// Check if the pipe is ready to read and therefore has "woken" us up
//
// To do so, we will `poll_read_ready` with a noop waker, since we don't
// need to actually be notified when read ready...
let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) };
let mut cx = Context::from_waker(&waker);

let ev = match self.receiver.registration().poll_read_ready(&mut cx) {
Poll::Ready(Ok(ev)) => ev,
Poll::Ready(Err(e)) => panic!("reactor gone: {}", e),
Poll::Pending => return, // No wake has arrived, bail
};
fn process(&mut self) {
// If the signal pipe has not recieved a readiness event, then there is
// nothing else to do.
if !self.io.consume_signal_ready() {
return;
}

// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
loop {
match (&*self.receiver).read(&mut buf) {
match self.receiver.read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {}", e),
}
}

self.receiver.registration().clear_readiness(ev);

// Broadcast any signals which were received
globals().broadcast();
}
}

const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);

unsafe fn noop_clone(_data: *const ()) -> RawWaker {
RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)
}

unsafe fn noop(_data: *const ()) {}

// ===== impl Handle =====

impl Handle {
Expand Down
8 changes: 3 additions & 5 deletions tokio/tests/rt_metrics.rs
Expand Up @@ -375,20 +375,18 @@ fn io_driver_fd_count() {
let rt = current_thread();
let metrics = rt.metrics();

// Since this is enabled w/ the process driver we always
// have 1 fd registered.
assert_eq!(metrics.io_driver_fd_registered_count(), 1);
assert_eq!(metrics.io_driver_fd_registered_count(), 0);

let stream = tokio::net::TcpStream::connect("google.com:80");
let stream = rt.block_on(async move { stream.await.unwrap() });

assert_eq!(metrics.io_driver_fd_registered_count(), 2);
assert_eq!(metrics.io_driver_fd_registered_count(), 1);
assert_eq!(metrics.io_driver_fd_deregistered_count(), 0);

drop(stream);

assert_eq!(metrics.io_driver_fd_deregistered_count(), 1);
assert_eq!(metrics.io_driver_fd_registered_count(), 2);
assert_eq!(metrics.io_driver_fd_registered_count(), 1);
}

#[cfg(any(target_os = "linux", target_os = "macos"))]
Expand Down

0 comments on commit ec66a92

Please sign in to comment.