Skip to content

Commit

Permalink
rt: start decoupling I/O driver and I/O handle (#5127)
Browse files Browse the repository at this point in the history
This is the start of applying a similar treatment to the I/O driver as
the time driver. The I/O driver will no longer hold its own reference to
the I/O handle. Instead, the handle is passed in when needed.

This patch also moves the process driver to the `runtime` module.
  • Loading branch information
carllerche committed Oct 26, 2022
1 parent ec66a92 commit 58c4571
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 66 deletions.
4 changes: 1 addition & 3 deletions tokio/src/process/unix/mod.rs
Expand Up @@ -21,8 +21,6 @@
//! processes in general aren't scalable (e.g. millions) so it shouldn't be that
//! bad in theory...

pub(crate) mod driver;

pub(crate) mod orphan;
use orphan::{OrphanQueue, OrphanQueueImpl, Wait};

Expand Down Expand Up @@ -90,7 +88,7 @@ impl fmt::Debug for GlobalOrphanQueue {
}

impl GlobalOrphanQueue {
fn reap_orphans(handle: &SignalHandle) {
pub(crate) fn reap_orphans(handle: &SignalHandle) {
get_orphan_queue().reap_orphans(handle)
}
}
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/process/unix/orphan.rs
Expand Up @@ -294,7 +294,8 @@ pub(crate) mod test {
#[cfg_attr(miri, ignore)] // Miri does not support epoll.
#[test]
fn does_not_register_signal_if_queue_empty() {
let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap();
let (io_driver, io_handle) = IoDriver::new().unwrap();
let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap();
let handle = signal_driver.handle();

let orphanage = OrphanQueueImpl::new();
Expand Down
45 changes: 25 additions & 20 deletions tokio/src/runtime/driver.rs
Expand Up @@ -81,6 +81,14 @@ impl Handle {
self.io.unpark();
}

cfg_io_driver! {
pub(crate) fn io(&self) -> &crate::runtime::io::Handle {
self.io
.as_ref()
.expect("A Tokio 1.x context was found, but I/O is disabled. Call `enable_io` on the runtime builder to enable I/O.")
}
}

cfg_time! {
/// Returns a reference to the time driver handle.
///
Expand Down Expand Up @@ -116,10 +124,9 @@ cfg_io_driver! {
assert!(!enabled);

let ret = if enabled {
let io_driver = crate::runtime::io::Driver::new()?;
let io_handle = io_driver.handle();
let (io_driver, io_handle) = crate::runtime::io::Driver::new()?;

let (signal_driver, signal_handle) = create_signal_driver(io_driver)?;
let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
let process_driver = create_process_driver(signal_driver);

(IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle)
Expand All @@ -133,23 +140,23 @@ cfg_io_driver! {
}

impl IoStack {
pub(crate) fn park(&mut self, _handle: &Handle) {
pub(crate) fn park(&mut self, handle: &Handle) {
match self {
IoStack::Enabled(v) => v.park(),
IoStack::Enabled(v) => v.park(handle),
IoStack::Disabled(v) => v.park(),
}
}

pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) {
pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
match self {
IoStack::Enabled(v) => v.park_timeout(duration),
IoStack::Enabled(v) => v.park_timeout(handle, duration),
IoStack::Disabled(v) => v.park_timeout(duration),
}
}

pub(crate) fn shutdown(&mut self, _handle: &Handle) {
pub(crate) fn shutdown(&mut self, handle: &Handle) {
match self {
IoStack::Enabled(v) => v.shutdown(),
IoStack::Enabled(v) => v.shutdown(handle),
IoStack::Disabled(v) => v.shutdown(),
}
}
Expand All @@ -171,12 +178,10 @@ cfg_io_driver! {
}
}

cfg_unstable! {
pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
match self {
IoHandle::Enabled(v) => Some(v),
IoHandle::Disabled(..) => None,
}
pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
match self {
IoHandle::Enabled(v) => Some(v),
IoHandle::Disabled(..) => None,
}
}
}
Expand Down Expand Up @@ -215,8 +220,8 @@ cfg_signal_internal_and_unix! {
type SignalDriver = crate::runtime::signal::Driver;
pub(crate) type SignalHandle = Option<crate::runtime::signal::Handle>;

fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
let driver = crate::runtime::signal::Driver::new(io_driver)?;
fn create_signal_driver(io_driver: IoDriver, io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
let driver = crate::runtime::signal::Driver::new(io_driver, io_handle)?;
let handle = driver.handle();
Ok((driver, Some(handle)))
}
Expand All @@ -228,7 +233,7 @@ cfg_not_signal_internal! {
cfg_io_driver! {
type SignalDriver = IoDriver;

fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
fn create_signal_driver(io_driver: IoDriver, _io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
Ok((io_driver, ()))
}
}
Expand All @@ -237,10 +242,10 @@ cfg_not_signal_internal! {
// ===== process driver =====

cfg_process_driver! {
type ProcessDriver = crate::process::unix::driver::Driver;
type ProcessDriver = crate::runtime::process::Driver;

fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
crate::process::unix::driver::Driver::new(signal_driver)
ProcessDriver::new(signal_driver)
}
}

Expand Down
51 changes: 24 additions & 27 deletions tokio/src/runtime/io/mod.rs
Expand Up @@ -10,6 +10,7 @@ mod metrics;

use crate::io::interest::Interest;
use crate::io::ready::Ready;
use crate::runtime::driver;
use crate::util::slab::{self, Slab};
use crate::{loom::sync::RwLock, util::bit};

Expand Down Expand Up @@ -38,9 +39,6 @@ pub(crate) struct Driver {

/// The system event queue.
poll: mio::Poll,

/// State shared between the reactor and the handles.
inner: Arc<Inner>,
}

/// A reference to an I/O driver.
Expand Down Expand Up @@ -112,7 +110,7 @@ fn _assert_kinds() {
impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new() -> io::Result<Driver> {
pub(crate) fn new() -> io::Result<(Driver, Handle)> {
let poll = mio::Poll::new()?;
#[cfg(not(tokio_wasi))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
Expand All @@ -121,44 +119,41 @@ impl Driver {
let slab = Slab::new();
let allocator = slab.allocator();

Ok(Driver {
let driver = Driver {
tick: 0,
signal_ready: false,
events: mio::Events::with_capacity(1024),
poll,
resources: slab,
};

let handle = Handle {
inner: Arc::new(Inner {
registry,
io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
#[cfg(not(tokio_wasi))]
waker,
metrics: IoDriverMetrics::default(),
}),
})
}
};

/// Returns a handle to this event loop which can be sent across threads
/// and can be used as a proxy to the event loop itself.
///
/// Handles are cloneable and clones always refer to the same event loop.
/// This handle is typically passed into functions that create I/O objects
/// to bind them to this event loop.
pub(crate) fn handle(&self) -> Handle {
Handle {
inner: Arc::clone(&self.inner),
}
Ok((driver, handle))
}

pub(crate) fn park(&mut self) {
self.turn(None);
pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();
self.turn(handle, None);
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.turn(Some(duration));
pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.io();
self.turn(handle, Some(duration));
}

pub(crate) fn shutdown(&mut self) {
if self.inner.shutdown() {
pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();

if handle.inner.shutdown() {
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
Expand All @@ -168,7 +163,7 @@ impl Driver {
}
}

fn turn(&mut self, max_wait: Option<Duration>) {
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
// How often to call `compact()` on the resource slab
const COMPACT_INTERVAL: u8 = 255;

Expand Down Expand Up @@ -213,7 +208,7 @@ impl Driver {
}
}

self.inner.metrics.incr_ready_count_by(ready_count);
handle.inner.metrics.incr_ready_count_by(ready_count);
}

fn dispatch(resources: &mut Slab<ScheduledIo>, tick: u8, token: mio::Token, ready: Ready) {
Expand Down Expand Up @@ -390,12 +385,14 @@ impl Direction {

// Signal handling
cfg_signal_internal_and_unix! {
impl Driver {
pub(crate) fn register_signal_receiver(&mut self, receiver: &mut mio::net::UnixStream) -> io::Result<()> {
impl Handle {
pub(crate) fn register_signal_receiver(&self, receiver: &mut mio::net::UnixStream) -> io::Result<()> {
self.inner.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
Ok(())
}
}

impl Driver {
pub(crate) fn consume_signal_ready(&mut self) -> bool {
let ret = self.signal_ready;
self.signal_ready = false;
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -184,6 +184,10 @@ cfg_io_driver_impl! {
pub(crate) mod io;
}

cfg_process_driver! {
mod process;
}

cfg_time! {
pub(crate) mod time;
}
Expand Down
Expand Up @@ -3,6 +3,7 @@
//! Process driver.

use crate::process::unix::GlobalOrphanQueue;
use crate::runtime::driver;
use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle};

use std::time::Duration;
Expand All @@ -27,17 +28,17 @@ impl Driver {
}
}

pub(crate) fn park(&mut self) {
self.park.park();
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.park.park(handle);
GlobalOrphanQueue::reap_orphans(&self.signal_handle);
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.park.park_timeout(duration);
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
self.park.park_timeout(handle, duration);
GlobalOrphanQueue::reap_orphans(&self.signal_handle);
}

pub(crate) fn shutdown(&mut self) {
self.park.shutdown()
pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
self.park.shutdown(handle)
}
}
18 changes: 9 additions & 9 deletions tokio/src/runtime/signal/mod.rs
Expand Up @@ -2,7 +2,7 @@

//! Signal driver

use crate::runtime::io;
use crate::runtime::{driver, io};
use crate::signal::registry::globals;

use mio::net::UnixStream;
Expand Down Expand Up @@ -39,7 +39,7 @@ pub(super) struct Inner(());

impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(mut io: io::Driver) -> std_io::Result<Self> {
pub(crate) fn new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result<Self> {
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};

Expand Down Expand Up @@ -70,7 +70,7 @@ impl Driver {
ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
let mut receiver = UnixStream::from_std(original.try_clone()?);

io.register_signal_receiver(&mut receiver)?;
io_handle.register_signal_receiver(&mut receiver)?;

Ok(Self {
io,
Expand All @@ -87,18 +87,18 @@ impl Driver {
}
}

pub(crate) fn park(&mut self) {
self.io.park();
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.io.park(handle);
self.process();
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.io.park_timeout(duration);
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
self.io.park_timeout(handle, duration);
self.process();
}

pub(crate) fn shutdown(&mut self) {
self.io.shutdown()
pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
self.io.shutdown(handle)
}

fn process(&mut self) {
Expand Down

0 comments on commit 58c4571

Please sign in to comment.