Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: start decoupling I/O driver and I/O handle #5127

Merged
merged 1 commit into from Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions tokio/src/process/unix/mod.rs
Expand Up @@ -21,8 +21,6 @@
//! processes in general aren't scalable (e.g. millions) so it shouldn't be that
//! bad in theory...

pub(crate) mod driver;

pub(crate) mod orphan;
use orphan::{OrphanQueue, OrphanQueueImpl, Wait};

Expand Down Expand Up @@ -90,7 +88,7 @@ impl fmt::Debug for GlobalOrphanQueue {
}

impl GlobalOrphanQueue {
fn reap_orphans(handle: &SignalHandle) {
pub(crate) fn reap_orphans(handle: &SignalHandle) {
get_orphan_queue().reap_orphans(handle)
}
}
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/process/unix/orphan.rs
Expand Up @@ -294,7 +294,8 @@ pub(crate) mod test {
#[cfg_attr(miri, ignore)] // Miri does not support epoll.
#[test]
fn does_not_register_signal_if_queue_empty() {
let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap();
let (io_driver, io_handle) = IoDriver::new().unwrap();
let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap();
let handle = signal_driver.handle();

let orphanage = OrphanQueueImpl::new();
Expand Down
45 changes: 25 additions & 20 deletions tokio/src/runtime/driver.rs
Expand Up @@ -81,6 +81,14 @@ impl Handle {
self.io.unpark();
}

cfg_io_driver! {
pub(crate) fn io(&self) -> &crate::runtime::io::Handle {
self.io
.as_ref()
.expect("A Tokio 1.x context was found, but I/O is disabled. Call `enable_io` on the runtime builder to enable I/O.")
}
}

cfg_time! {
/// Returns a reference to the time driver handle.
///
Expand Down Expand Up @@ -116,10 +124,9 @@ cfg_io_driver! {
assert!(!enabled);

let ret = if enabled {
let io_driver = crate::runtime::io::Driver::new()?;
let io_handle = io_driver.handle();
let (io_driver, io_handle) = crate::runtime::io::Driver::new()?;

let (signal_driver, signal_handle) = create_signal_driver(io_driver)?;
let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
let process_driver = create_process_driver(signal_driver);

(IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle)
Expand All @@ -133,23 +140,23 @@ cfg_io_driver! {
}

impl IoStack {
pub(crate) fn park(&mut self, _handle: &Handle) {
pub(crate) fn park(&mut self, handle: &Handle) {
match self {
IoStack::Enabled(v) => v.park(),
IoStack::Enabled(v) => v.park(handle),
IoStack::Disabled(v) => v.park(),
}
}

pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) {
pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
match self {
IoStack::Enabled(v) => v.park_timeout(duration),
IoStack::Enabled(v) => v.park_timeout(handle, duration),
IoStack::Disabled(v) => v.park_timeout(duration),
}
}

pub(crate) fn shutdown(&mut self, _handle: &Handle) {
pub(crate) fn shutdown(&mut self, handle: &Handle) {
match self {
IoStack::Enabled(v) => v.shutdown(),
IoStack::Enabled(v) => v.shutdown(handle),
IoStack::Disabled(v) => v.shutdown(),
}
}
Expand All @@ -171,12 +178,10 @@ cfg_io_driver! {
}
}

cfg_unstable! {
pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
match self {
IoHandle::Enabled(v) => Some(v),
IoHandle::Disabled(..) => None,
}
pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
match self {
IoHandle::Enabled(v) => Some(v),
IoHandle::Disabled(..) => None,
}
}
}
Expand Down Expand Up @@ -215,8 +220,8 @@ cfg_signal_internal_and_unix! {
type SignalDriver = crate::runtime::signal::Driver;
pub(crate) type SignalHandle = Option<crate::runtime::signal::Handle>;

fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
let driver = crate::runtime::signal::Driver::new(io_driver)?;
fn create_signal_driver(io_driver: IoDriver, io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
let driver = crate::runtime::signal::Driver::new(io_driver, io_handle)?;
let handle = driver.handle();
Ok((driver, Some(handle)))
}
Expand All @@ -228,7 +233,7 @@ cfg_not_signal_internal! {
cfg_io_driver! {
type SignalDriver = IoDriver;

fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
fn create_signal_driver(io_driver: IoDriver, _io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
Ok((io_driver, ()))
}
}
Expand All @@ -237,10 +242,10 @@ cfg_not_signal_internal! {
// ===== process driver =====

cfg_process_driver! {
type ProcessDriver = crate::process::unix::driver::Driver;
type ProcessDriver = crate::runtime::process::Driver;

fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
crate::process::unix::driver::Driver::new(signal_driver)
ProcessDriver::new(signal_driver)
}
}

Expand Down
51 changes: 24 additions & 27 deletions tokio/src/runtime/io/mod.rs
Expand Up @@ -10,6 +10,7 @@ mod metrics;

use crate::io::interest::Interest;
use crate::io::ready::Ready;
use crate::runtime::driver;
use crate::util::slab::{self, Slab};
use crate::{loom::sync::RwLock, util::bit};

Expand Down Expand Up @@ -38,9 +39,6 @@ pub(crate) struct Driver {

/// The system event queue.
poll: mio::Poll,

/// State shared between the reactor and the handles.
inner: Arc<Inner>,
}

/// A reference to an I/O driver.
Expand Down Expand Up @@ -112,7 +110,7 @@ fn _assert_kinds() {
impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new() -> io::Result<Driver> {
pub(crate) fn new() -> io::Result<(Driver, Handle)> {
let poll = mio::Poll::new()?;
#[cfg(not(tokio_wasi))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
Expand All @@ -121,44 +119,41 @@ impl Driver {
let slab = Slab::new();
let allocator = slab.allocator();

Ok(Driver {
let driver = Driver {
tick: 0,
signal_ready: false,
events: mio::Events::with_capacity(1024),
poll,
resources: slab,
};

let handle = Handle {
inner: Arc::new(Inner {
registry,
io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
#[cfg(not(tokio_wasi))]
waker,
metrics: IoDriverMetrics::default(),
}),
})
}
};

/// Returns a handle to this event loop which can be sent across threads
/// and can be used as a proxy to the event loop itself.
///
/// Handles are cloneable and clones always refer to the same event loop.
/// This handle is typically passed into functions that create I/O objects
/// to bind them to this event loop.
pub(crate) fn handle(&self) -> Handle {
Handle {
inner: Arc::clone(&self.inner),
}
Ok((driver, handle))
}

pub(crate) fn park(&mut self) {
self.turn(None);
pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();
self.turn(handle, None);
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.turn(Some(duration));
pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.io();
self.turn(handle, Some(duration));
}

pub(crate) fn shutdown(&mut self) {
if self.inner.shutdown() {
pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();

if handle.inner.shutdown() {
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
Expand All @@ -168,7 +163,7 @@ impl Driver {
}
}

fn turn(&mut self, max_wait: Option<Duration>) {
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
// How often to call `compact()` on the resource slab
const COMPACT_INTERVAL: u8 = 255;

Expand Down Expand Up @@ -213,7 +208,7 @@ impl Driver {
}
}

self.inner.metrics.incr_ready_count_by(ready_count);
handle.inner.metrics.incr_ready_count_by(ready_count);
}

fn dispatch(resources: &mut Slab<ScheduledIo>, tick: u8, token: mio::Token, ready: Ready) {
Expand Down Expand Up @@ -390,12 +385,14 @@ impl Direction {

// Signal handling
cfg_signal_internal_and_unix! {
impl Driver {
pub(crate) fn register_signal_receiver(&mut self, receiver: &mut mio::net::UnixStream) -> io::Result<()> {
impl Handle {
pub(crate) fn register_signal_receiver(&self, receiver: &mut mio::net::UnixStream) -> io::Result<()> {
self.inner.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
Ok(())
}
}

impl Driver {
pub(crate) fn consume_signal_ready(&mut self) -> bool {
let ret = self.signal_ready;
self.signal_ready = false;
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -184,6 +184,10 @@ cfg_io_driver_impl! {
pub(crate) mod io;
}

cfg_process_driver! {
mod process;
}

cfg_time! {
pub(crate) mod time;
}
Expand Down
Expand Up @@ -3,6 +3,7 @@
//! Process driver.

use crate::process::unix::GlobalOrphanQueue;
use crate::runtime::driver;
use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle};

use std::time::Duration;
Expand All @@ -27,17 +28,17 @@ impl Driver {
}
}

pub(crate) fn park(&mut self) {
self.park.park();
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.park.park(handle);
GlobalOrphanQueue::reap_orphans(&self.signal_handle);
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.park.park_timeout(duration);
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
self.park.park_timeout(handle, duration);
GlobalOrphanQueue::reap_orphans(&self.signal_handle);
}

pub(crate) fn shutdown(&mut self) {
self.park.shutdown()
pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
self.park.shutdown(handle)
}
}
18 changes: 9 additions & 9 deletions tokio/src/runtime/signal/mod.rs
Expand Up @@ -2,7 +2,7 @@

//! Signal driver

use crate::runtime::io;
use crate::runtime::{driver, io};
use crate::signal::registry::globals;

use mio::net::UnixStream;
Expand Down Expand Up @@ -39,7 +39,7 @@ pub(super) struct Inner(());

impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(mut io: io::Driver) -> std_io::Result<Self> {
pub(crate) fn new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result<Self> {
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};

Expand Down Expand Up @@ -70,7 +70,7 @@ impl Driver {
ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
let mut receiver = UnixStream::from_std(original.try_clone()?);

io.register_signal_receiver(&mut receiver)?;
io_handle.register_signal_receiver(&mut receiver)?;

Ok(Self {
io,
Expand All @@ -87,18 +87,18 @@ impl Driver {
}
}

pub(crate) fn park(&mut self) {
self.io.park();
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.io.park(handle);
self.process();
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.io.park_timeout(duration);
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
self.io.park_timeout(handle, duration);
self.process();
}

pub(crate) fn shutdown(&mut self) {
self.io.shutdown()
pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
self.io.shutdown(handle)
}

fn process(&mut self) {
Expand Down