Skip to content

Commit

Permalink
rt: move time driver into runtime module (#4983)
Browse files Browse the repository at this point in the history
This patch moves the time driver into the runtime module. The time driver
is a runtime concern and is only used by the runtime. Moving the drivers
is the first step to cleaning up some Tokio internals. There will be
follow-up patches that integrate the drivers and other runtime concerns
more closely.

This is an internal refactor and should not impact any public APIs.
  • Loading branch information
carllerche committed Sep 6, 2022
1 parent 116fa7c commit bbfd34f
Show file tree
Hide file tree
Showing 13 changed files with 33 additions and 546 deletions.
5 changes: 2 additions & 3 deletions tokio/src/lib.rs
Expand Up @@ -497,9 +497,8 @@ cfg_rt! {
pub mod runtime;
}
cfg_not_rt! {
cfg_io_driver_impl! {
pub(crate) mod runtime;
}
// The `runtime` module is used when the IO or time driver is needed.
pub(crate) mod runtime;
}

pub(crate) mod coop;
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/driver.rs
Expand Up @@ -98,10 +98,10 @@ cfg_not_process_driver! {
// ===== time driver =====

cfg_time! {
type TimeDriver = crate::park::either::Either<crate::time::driver::Driver<IoStack>, IoStack>;
type TimeDriver = crate::park::either::Either<crate::runtime::time::Driver<IoStack>, IoStack>;

pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;
pub(crate) type TimeHandle = Option<crate::runtime::time::Handle>;

fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
crate::time::Clock::new(enable_pausing, start_paused)
Expand All @@ -115,7 +115,7 @@ cfg_time! {
use crate::park::either::Either;

if enable {
let driver = crate::time::driver::Driver::new(io_stack, clock);
let driver = crate::runtime::time::Driver::new(io_stack, clock);
let handle = driver.handle();

(Either::A(driver), Some(handle))
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -181,6 +181,10 @@ cfg_io_driver_impl! {
pub(crate) mod io;
}

cfg_time! {
pub(crate) mod time;
}

cfg_rt! {
pub(crate) mod enter;

Expand Down
Expand Up @@ -283,7 +283,7 @@ impl StateCell {
/// timer. As this participates in intrusive data structures, it must be pinned
/// before polling.
#[derive(Debug)]
pub(super) struct TimerEntry {
pub(crate) struct TimerEntry {
/// Arc reference to the driver. We can only free the driver after
/// deregistering everything from their respective timer wheels.
driver: Handle,
Expand Down
@@ -1,5 +1,5 @@
use crate::loom::sync::Arc;
use crate::time::driver::ClockTime;
use crate::runtime::time::ClockTime;
use std::fmt;

/// Handle to time driver instance.
Expand All @@ -17,7 +17,7 @@ impl Handle {
}

/// Returns the time source associated with this handle.
pub(super) fn time_source(&self) -> &ClockTime {
pub(crate) fn time_source(&self) -> &ClockTime {
&self.time_source
}

Expand Down
15 changes: 7 additions & 8 deletions tokio/src/time/driver/mod.rs → tokio/src/runtime/time/mod.rs
Expand Up @@ -7,15 +7,14 @@
//! Time driver.

mod entry;
pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared};
pub(crate) use entry::TimerEntry;
use entry::{EntryList, TimerHandle, TimerShared};

mod handle;
pub(crate) use self::handle::Handle;

mod wheel;

pub(super) mod sleep;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
Expand Down Expand Up @@ -104,8 +103,8 @@ pub(crate) struct Driver<P: Park + 'static> {

/// A structure which handles conversion from Instants to u64 timestamps.
#[derive(Debug, Clone)]
pub(self) struct ClockTime {
clock: super::clock::Clock,
pub(crate) struct ClockTime {
clock: crate::time::Clock,
start_time: Instant,
}

Expand All @@ -117,7 +116,7 @@ impl ClockTime {
}
}

pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 {
pub(crate) fn deadline_to_tick(&self, t: Instant) -> u64 {
// Round up to the end of a ms
self.instant_to_tick(t + Duration::from_nanos(999_999))
}
Expand All @@ -136,7 +135,7 @@ impl ClockTime {
Duration::from_millis(t)
}

pub(self) fn now(&self) -> u64 {
pub(crate) fn now(&self) -> u64 {
self.instant_to_tick(self.clock.now())
}
}
Expand Down Expand Up @@ -403,7 +402,7 @@ impl Handle {

None
}
Err((entry, super::error::InsertError::Elapsed)) => unsafe {
Err((entry, crate::time::error::InsertError::Elapsed)) => unsafe {
entry.fire(Ok(()))
},
}
Expand Down
Expand Up @@ -48,7 +48,7 @@ fn model(f: impl Fn() + Send + Sync + 'static) {
#[test]
fn single_timer() {
model(|| {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -79,7 +79,7 @@ fn single_timer() {
#[test]
fn drop_timer() {
model(|| {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -110,7 +110,7 @@ fn drop_timer() {
#[test]
fn change_waker() {
model(|| {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -145,7 +145,7 @@ fn reset_future() {
model(|| {
let finished_early = Arc::new(AtomicBool::new(false));

let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
let time_source = super::ClockTime::new(clock.clone());

let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
Expand Down Expand Up @@ -201,7 +201,7 @@ fn normal_or_miri<T>(normal: T, miri: T) -> T {
#[test]
#[cfg(not(loom))]
fn poll_process_levels() {
let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
clock.pause();

let time_source = super::ClockTime::new(clock.clone());
Expand Down Expand Up @@ -242,7 +242,7 @@ fn poll_process_levels() {
fn poll_process_levels_targeted() {
let mut context = Context::from_waker(noop_waker_ref());

let clock = crate::time::clock::Clock::new(true, false);
let clock = crate::time::Clock::new(true, false);
clock.pause();

let time_source = super::ClockTime::new(clock.clone());
Expand All @@ -258,46 +258,3 @@ fn poll_process_levels_targeted() {
handle.process_at_time(192);
handle.process_at_time(192);
}

/*
#[test]
fn balanced_incr_and_decr() {
const OPS: usize = 5;
fn incr(inner: Arc<Inner>) {
for _ in 0..OPS {
inner.increment().expect("increment should not have failed");
thread::yield_now();
}
}
fn decr(inner: Arc<Inner>) {
let mut ops_performed = 0;
while ops_performed < OPS {
if inner.num(Ordering::Relaxed) > 0 {
ops_performed += 1;
inner.decrement();
}
thread::yield_now();
}
}
loom::model(|| {
let unpark = Box::new(MockUnpark);
let instant = Instant::now();
let inner = Arc::new(Inner::new(instant, unpark));
let incr_inner = inner.clone();
let decr_inner = inner.clone();
let incr_handle = thread::spawn(move || incr(incr_inner));
let decr_handle = thread::spawn(move || decr(decr_inner));
incr_handle.join().expect("should never fail");
decr_handle.join().expect("should never fail");
assert_eq!(inner.num(Ordering::SeqCst), 0);
})
}
*/
@@ -1,6 +1,4 @@
use crate::time::driver::TimerHandle;

use crate::time::driver::{EntryList, TimerShared};
use crate::runtime::time::{EntryList, TimerHandle, TimerShared};

use std::{fmt, ptr::NonNull};

Expand Down
@@ -1,4 +1,4 @@
use crate::time::driver::{TimerHandle, TimerShared};
use crate::runtime::time::{TimerHandle, TimerShared};
use crate::time::error::InsertError;

mod level;
Expand Down
13 changes: 4 additions & 9 deletions tokio/src/time/mod.rs
Expand Up @@ -82,17 +82,13 @@
//! ```
//!
//! [`interval`]: crate::time::interval()
//! [`sleep`]: sleep()

mod clock;
pub(crate) use self::clock::Clock;
#[cfg(feature = "test-util")]
pub use clock::{advance, pause, resume};

pub(crate) mod driver;

#[doc(inline)]
pub use driver::sleep::{sleep, sleep_until, Sleep};

pub mod error;

mod instant;
Expand All @@ -101,14 +97,13 @@ pub use self::instant::Instant;
mod interval;
pub use interval::{interval, interval_at, Interval, MissedTickBehavior};

mod sleep;
pub use sleep::{sleep, sleep_until, Sleep};

mod timeout;
#[doc(inline)]
pub use timeout::{timeout, timeout_at, Timeout};

#[cfg(test)]
#[cfg(not(loom))]
mod tests;

// Re-export for convenience
#[doc(no_inline)]
pub use std::time::Duration;
4 changes: 2 additions & 2 deletions tokio/src/time/driver/sleep.rs → tokio/src/time/sleep.rs
@@ -1,6 +1,6 @@
#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::time::driver::ClockTime;
use crate::time::driver::{Handle, TimerEntry};
use crate::runtime::time::ClockTime;
use crate::runtime::time::{Handle, TimerEntry};
use crate::time::{error::Error, Duration, Instant};
use crate::util::trace;

Expand Down
22 changes: 0 additions & 22 deletions tokio/src/time/tests/mod.rs

This file was deleted.

0 comments on commit bbfd34f

Please sign in to comment.