Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: remove Arc from Clock #5434

Merged
merged 7 commits into from Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 6 additions & 9 deletions tokio/src/runtime/driver.rs
Expand Up @@ -45,8 +45,7 @@ impl Driver {

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, clock.clone());
let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, &clock);

Ok((
Self { inner: time_driver },
Expand Down Expand Up @@ -111,10 +110,8 @@ impl Handle {
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}

cfg_test_util! {
pub(crate) fn clock(&self) -> &Clock {
&self.clock
}
pub(crate) fn clock(&self) -> &Clock {
&self.clock
}
}
}
Expand Down Expand Up @@ -289,10 +286,10 @@ cfg_time! {
fn create_time_driver(
enable: bool,
io_stack: IoStack,
clock: Clock,
clock: &Clock,
) -> (TimeDriver, TimeHandle) {
if enable {
let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock);
let (driver, handle) = crate::runtime::time::Driver::new(io_stack, &clock);

(TimeDriver::Enabled { driver }, Some(handle))
} else {
Expand Down Expand Up @@ -337,7 +334,7 @@ cfg_not_time! {
fn create_time_driver(
_enable: bool,
io_stack: IoStack,
_clock: Clock,
_clock: &Clock,
) -> (TimeDriver, TimeHandle) {
(io_stack, ())
}
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/time/mod.rs
Expand Up @@ -125,7 +125,7 @@ impl Driver {
/// thread and `time_source` to get the current time and convert to ticks.
///
/// Specifying the source of time is useful when testing.
pub(crate) fn new(park: IoStack, clock: Clock) -> (Driver, Handle) {
pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) {
let time_source = TimeSource::new(clock);

let handle = Handle {
Expand Down Expand Up @@ -186,7 +186,7 @@ impl Driver {

match next_wake {
Some(when) => {
let now = handle.time_source.now();
let now = handle.time_source.now(rt_handle.clock());
// Note that we effectively round up to 1ms here - this avoids
// very short-duration microsecond-resolution sleeps that the OS
// might treat as zero-length.
Expand Down Expand Up @@ -214,13 +214,13 @@ impl Driver {
}

// Process pending timers after waking up
handle.process();
handle.process(rt_handle.clock());
}

cfg_test_util! {
fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.time();
let clock = &handle.time_source.clock;
let clock = rt_handle.clock();

if clock.can_auto_advance() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
Expand Down Expand Up @@ -248,8 +248,8 @@ impl Driver {

impl Handle {
/// Runs timer related logic, and returns the next wakeup time
pub(self) fn process(&self) {
let now = self.time_source().now();
pub(self) fn process(&self, clock: &Clock) {
let now = self.time_source().now(clock);

self.process_at_time(now)
}
Expand Down
8 changes: 3 additions & 5 deletions tokio/src/runtime/time/source.rs
Expand Up @@ -5,15 +5,13 @@ use std::convert::TryInto;
/// A structure which handles conversion from Instants to u64 timestamps.
#[derive(Debug)]
pub(crate) struct TimeSource {
pub(crate) clock: Clock,
start_time: Instant,
}

impl TimeSource {
pub(crate) fn new(clock: Clock) -> Self {
pub(crate) fn new(clock: &Clock) -> Self {
Self {
start_time: clock.now(),
clock,
}
}

Expand All @@ -36,7 +34,7 @@ impl TimeSource {
Duration::from_millis(t)
}

pub(crate) fn now(&self) -> u64 {
self.instant_to_tick(self.clock.now())
pub(crate) fn now(&self, clock: &Clock) -> u64 {
self.instant_to_tick(clock.now())
}
}
58 changes: 33 additions & 25 deletions tokio/src/time/clock.rs
Expand Up @@ -29,30 +29,30 @@ cfg_not_test_util! {

cfg_test_util! {
use crate::time::{Duration, Instant};
use crate::loom::sync::{Arc, Mutex};
use crate::loom::sync::Mutex;

cfg_rt! {
fn clock() -> Option<Clock> {
fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> R) -> R {
use crate::runtime::Handle;

match Handle::try_current() {
Ok(handle) => Some(handle.inner.driver().clock().clone()),
Err(ref e) if e.is_missing_context() => None,
Ok(handle) => f(Some(handle.inner.driver().clock())),
Err(ref e) if e.is_missing_context() => f(None),
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
}
}
}

cfg_not_rt! {
fn clock() -> Option<Clock> {
None
fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> R) -> R {
f(None)
}
}

/// A handle to a source of time.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) struct Clock {
inner: Arc<Mutex<Inner>>,
inner: Mutex<Inner>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -107,8 +107,10 @@ cfg_test_util! {
/// [`advance`]: crate::time::advance
#[track_caller]
pub fn pause() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.pause();
with_clock(|maybe_clock| {
let clock = maybe_clock.expect("time cannot be frozen from outside the Tokio runtime");
clock.pause();
})
}

/// Resumes time.
Expand All @@ -122,14 +124,16 @@ cfg_test_util! {
/// runtime.
#[track_caller]
pub fn resume() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut inner = clock.inner.lock();
with_clock(|maybe_clock| {
let clock = maybe_clock.expect("time cannot be frozen from outside the Tokio runtime");
let mut inner = clock.inner.lock();

if inner.unfrozen.is_some() {
panic!("time is not frozen");
}
if inner.unfrozen.is_some() {
panic!("time is not frozen");
}

inner.unfrozen = Some(std::time::Instant::now());
inner.unfrozen = Some(std::time::Instant::now());
})
}

/// Advances time.
Expand Down Expand Up @@ -164,19 +168,23 @@ cfg_test_util! {
///
/// [`sleep`]: fn@crate::time::sleep
pub async fn advance(duration: Duration) {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.advance(duration);
with_clock(|maybe_clock| {
let clock = maybe_clock.expect("time cannot be frozen from outside the Tokio runtime");
clock.advance(duration);
});

crate::task::yield_now().await;
}

/// Returns the current instant, factoring in frozen time.
pub(crate) fn now() -> Instant {
if let Some(clock) = clock() {
clock.now()
} else {
Instant::from_std(std::time::Instant::now())
}
with_clock(|maybe_clock| {
if let Some(clock) = maybe_clock {
clock.now()
} else {
Instant::from_std(std::time::Instant::now())
}
})
}

impl Clock {
Expand All @@ -186,12 +194,12 @@ cfg_test_util! {
let now = std::time::Instant::now();

let clock = Clock {
inner: Arc::new(Mutex::new(Inner {
inner: Mutex::new(Inner {
enable_pausing,
base: now,
unfrozen: Some(now),
auto_advance_inhibit_count: 0,
})),
}),
};

if start_paused {
Expand Down