Skip to content

Commit

Permalink
rt: remove Arc from I/O driver (#5134)
Browse files Browse the repository at this point in the history
The next step in the great driver cleanup. This patch removes the Arc
used in the I/O driver in favor of `runtime::scheduler::Handle`.
  • Loading branch information
carllerche committed Oct 27, 2022
1 parent 4bcd08b commit 32d68fe
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 66 deletions.
2 changes: 1 addition & 1 deletion tokio/src/runtime/driver.rs
Expand Up @@ -114,7 +114,7 @@ cfg_io_driver! {
Disabled(ParkThread),
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) enum IoHandle {
Enabled(crate::runtime::io::Handle),
Disabled(UnparkThread),
Expand Down
98 changes: 38 additions & 60 deletions tokio/src/runtime/io/mod.rs
Expand Up @@ -18,7 +18,6 @@ use metrics::IoDriverMetrics;

use std::fmt;
use std::io;
use std::sync::Arc;
use std::time::Duration;

/// I/O driver, backed by Mio.
Expand All @@ -42,23 +41,7 @@ pub(crate) struct Driver {
}

/// A reference to an I/O driver.
#[derive(Clone)]
pub(crate) struct Handle {
pub(super) inner: Arc<Inner>,
}

#[derive(Debug)]
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
}

struct IoDispatcher {
allocator: slab::Allocator<ScheduledIo>,
is_shutdown: bool,
}

pub(super) struct Inner {
/// Registers I/O resources.
registry: mio::Registry,

Expand All @@ -70,7 +53,18 @@ pub(super) struct Inner {
#[cfg(not(tokio_wasi))]
waker: mio::Waker,

metrics: IoDriverMetrics,
pub(crate) metrics: IoDriverMetrics,
}

#[derive(Debug)]
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
}

struct IoDispatcher {
allocator: slab::Allocator<ScheduledIo>,
is_shutdown: bool,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
Expand Down Expand Up @@ -128,13 +122,11 @@ impl Driver {
};

let handle = Handle {
inner: Arc::new(Inner {
registry,
io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
#[cfg(not(tokio_wasi))]
waker,
metrics: IoDriverMetrics::default(),
}),
registry,
io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
#[cfg(not(tokio_wasi))]
waker,
metrics: IoDriverMetrics::default(),
};

Ok((driver, handle))
Expand All @@ -153,7 +145,7 @@ impl Driver {
pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();

if handle.inner.shutdown() {
if handle.shutdown() {
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
Expand Down Expand Up @@ -208,7 +200,7 @@ impl Driver {
}
}

handle.inner.metrics.incr_ready_count_by(ready_count);
handle.metrics.incr_ready_count_by(ready_count);
}

fn dispatch(resources: &mut Slab<ScheduledIo>, tick: u8, token: mio::Token, ready: Ready) {
Expand Down Expand Up @@ -236,16 +228,6 @@ impl fmt::Debug for Driver {
}
}

cfg_net! {
cfg_metrics! {
impl Handle {
pub(crate) fn metrics(&self) -> &IoDriverMetrics {
&self.inner.metrics
}
}
}
}

impl Handle {
/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
/// makes the next call to `turn` return immediately.
Expand All @@ -258,30 +240,9 @@ impl Handle {
/// return immediately.
pub(crate) fn unpark(&self) {
#[cfg(not(tokio_wasi))]
self.inner.waker.wake().expect("failed to wake I/O driver");
self.waker.wake().expect("failed to wake I/O driver");
}
}

impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Handle")
}
}

// ===== impl IoDispatcher =====

impl IoDispatcher {
fn new(allocator: slab::Allocator<ScheduledIo>) -> Self {
Self {
allocator,
is_shutdown: false,
}
}
}

// ===== impl Inner =====

impl Inner {
/// Registers an I/O resource with the reactor for a given `mio::Ready` state.
///
/// The registration token is returned.
Expand Down Expand Up @@ -342,6 +303,23 @@ impl Inner {
}
}

impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Handle")
}
}

// ===== impl IoDispatcher =====

impl IoDispatcher {
fn new(allocator: slab::Allocator<ScheduledIo>) -> Self {
Self {
allocator,
is_shutdown: false,
}
}
}

impl Direction {
pub(super) fn mask(self) -> Ready {
match self {
Expand All @@ -355,7 +333,7 @@ impl Direction {
cfg_signal_internal_and_unix! {
impl Handle {
pub(crate) fn register_signal_receiver(&self, receiver: &mut mio::net::UnixStream) -> io::Result<()> {
self.inner.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
self.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
Ok(())
}
}
Expand Down
8 changes: 4 additions & 4 deletions tokio/src/runtime/io/registration.rs
Expand Up @@ -73,7 +73,7 @@ impl Registration {
interest: Interest,
handle: scheduler::Handle,
) -> io::Result<Registration> {
let shared = handle.io().inner.add_source(io, interest)?;
let shared = handle.io().add_source(io, interest)?;

Ok(Registration { handle, shared })
}
Expand All @@ -95,7 +95,7 @@ impl Registration {
///
/// `Err` is returned if an error is encountered.
pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
self.handle().inner.deregister_source(io)
self.handle().deregister_source(io)
}

pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
Expand Down Expand Up @@ -148,7 +148,7 @@ impl Registration {
let coop = ready!(crate::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));

if self.handle().inner.is_shutdown() {
if self.handle().is_shutdown() {
return Poll::Ready(Err(gone()));
}

Expand Down Expand Up @@ -230,7 +230,7 @@ cfg_io_readiness! {
pin!(fut);

crate::future::poll_fn(|cx| {
if self.handle().inner.is_shutdown() {
if self.handle().is_shutdown() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/metrics/runtime.rs
Expand Up @@ -530,7 +530,7 @@ cfg_net! {
.driver()
.io
.as_ref()
.map(|h| f(h.metrics()))
.map(|h| f(&h.metrics))
.unwrap_or(0)
}
}
Expand Down

0 comments on commit 32d68fe

Please sign in to comment.