Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: remove Arc from Clock #5434

Merged
merged 7 commits into from Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 5 additions & 8 deletions tokio/src/runtime/driver.rs
Expand Up @@ -45,8 +45,7 @@ impl Driver {

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, clock.clone());
let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, &clock);

Ok((
Self { inner: time_driver },
Expand Down Expand Up @@ -111,10 +110,8 @@ impl Handle {
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}

cfg_test_util! {
pub(crate) fn clock(&self) -> &Clock {
&self.clock
}
pub(crate) fn clock(&self) -> &Clock {
&self.clock
}
}
}
Expand Down Expand Up @@ -289,7 +286,7 @@ cfg_time! {
fn create_time_driver(
enable: bool,
io_stack: IoStack,
clock: Clock,
clock: &Clock,
) -> (TimeDriver, TimeHandle) {
if enable {
let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock);
Expand Down Expand Up @@ -337,7 +334,7 @@ cfg_not_time! {
fn create_time_driver(
_enable: bool,
io_stack: IoStack,
_clock: Clock,
_clock: &Clock,
) -> (TimeDriver, TimeHandle) {
(io_stack, ())
}
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/time/entry.rs
Expand Up @@ -579,6 +579,11 @@ impl TimerEntry {
pub(crate) fn driver(&self) -> &super::Handle {
self.driver.driver().time()
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(crate) fn clock(&self) -> &super::Clock {
self.driver.driver().clock()
}
}

impl TimerHandle {
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/time/mod.rs
Expand Up @@ -125,7 +125,7 @@ impl Driver {
/// thread and `time_source` to get the current time and convert to ticks.
///
/// Specifying the source of time is useful when testing.
pub(crate) fn new(park: IoStack, clock: Clock) -> (Driver, Handle) {
pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) {
let time_source = TimeSource::new(clock);

let handle = Handle {
Expand Down Expand Up @@ -186,7 +186,7 @@ impl Driver {

match next_wake {
Some(when) => {
let now = handle.time_source.now();
let now = handle.time_source.now(rt_handle.clock());
// Note that we effectively round up to 1ms here - this avoids
// very short-duration microsecond-resolution sleeps that the OS
// might treat as zero-length.
Expand Down Expand Up @@ -214,13 +214,13 @@ impl Driver {
}

// Process pending timers after waking up
handle.process();
handle.process(rt_handle.clock());
}

cfg_test_util! {
fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.time();
let clock = &handle.time_source.clock;
let clock = rt_handle.clock();

if clock.can_auto_advance() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
Expand Down Expand Up @@ -248,8 +248,8 @@ impl Driver {

impl Handle {
/// Runs timer related logic, and returns the next wakeup time
pub(self) fn process(&self) {
let now = self.time_source().now();
pub(self) fn process(&self, clock: &Clock) {
let now = self.time_source().now(clock);

self.process_at_time(now)
}
Expand Down
8 changes: 3 additions & 5 deletions tokio/src/runtime/time/source.rs
Expand Up @@ -5,15 +5,13 @@ use std::convert::TryInto;
/// A structure which handles conversion from Instants to u64 timestamps.
#[derive(Debug)]
pub(crate) struct TimeSource {
pub(crate) clock: Clock,
start_time: Instant,
}

impl TimeSource {
pub(crate) fn new(clock: Clock) -> Self {
pub(crate) fn new(clock: &Clock) -> Self {
Self {
start_time: clock.now(),
clock,
}
}

Expand All @@ -36,7 +34,7 @@ impl TimeSource {
Duration::from_millis(t)
}

pub(crate) fn now(&self) -> u64 {
self.instant_to_tick(self.clock.now())
pub(crate) fn now(&self, clock: &Clock) -> u64 {
self.instant_to_tick(clock.now())
}
}
15 changes: 9 additions & 6 deletions tokio/src/runtime/time/tests/mod.rs
Expand Up @@ -62,12 +62,13 @@ fn single_timer() {

thread::yield_now();

let handle = handle.inner.driver().time();
let time = handle.inner.driver().time();
let clock = handle.inner.driver().clock();

// This may or may not return Some (depending on how it races with the
// thread). If it does return None, however, the timer should complete
// synchronously.
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
time.process_at_time(time.time_source().now(clock) + 2_000_000_000);

jh.join().unwrap();
})
Expand Down Expand Up @@ -97,10 +98,11 @@ fn drop_timer() {

thread::yield_now();

let handle = handle.inner.driver().time();
let time = handle.inner.driver().time();
let clock = handle.inner.driver().clock();

// advance 2s in the future.
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
time.process_at_time(time.time_source().now(clock) + 2_000_000_000);

jh.join().unwrap();
})
Expand Down Expand Up @@ -132,10 +134,11 @@ fn change_waker() {

thread::yield_now();

let handle = handle.inner.driver().time();
let time = handle.inner.driver().time();
let clock = handle.inner.driver().clock();

// advance 2s
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
time.process_at_time(time.time_source().now(clock) + 2_000_000_000);

jh.join().unwrap();
})
Expand Down
58 changes: 33 additions & 25 deletions tokio/src/time/clock.rs
Expand Up @@ -29,30 +29,30 @@ cfg_not_test_util! {

cfg_test_util! {
use crate::time::{Duration, Instant};
use crate::loom::sync::{Arc, Mutex};
use crate::loom::sync::Mutex;

cfg_rt! {
fn clock() -> Option<Clock> {
fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> R) -> R {
use crate::runtime::Handle;

match Handle::try_current() {
Ok(handle) => Some(handle.inner.driver().clock().clone()),
Err(ref e) if e.is_missing_context() => None,
Ok(handle) => f(Some(handle.inner.driver().clock())),
Err(ref e) if e.is_missing_context() => f(None),
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
}
}
}

cfg_not_rt! {
fn clock() -> Option<Clock> {
None
fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> R) -> R {
f(None)
}
}

/// A handle to a source of time.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) struct Clock {
inner: Arc<Mutex<Inner>>,
inner: Mutex<Inner>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -107,8 +107,10 @@ cfg_test_util! {
/// [`advance`]: crate::time::advance
#[track_caller]
pub fn pause() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.pause();
with_clock(|maybe_clock| {
let clock = maybe_clock.expect("time cannot be frozen from outside the Tokio runtime");
clock.pause();
})
}

/// Resumes time.
Expand All @@ -122,14 +124,16 @@ cfg_test_util! {
/// runtime.
#[track_caller]
pub fn resume() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut inner = clock.inner.lock();
with_clock(|maybe_clock| {
let clock = maybe_clock.expect("time cannot be frozen from outside the Tokio runtime");
let mut inner = clock.inner.lock();

if inner.unfrozen.is_some() {
panic!("time is not frozen");
}
if inner.unfrozen.is_some() {
panic!("time is not frozen");
}

inner.unfrozen = Some(std::time::Instant::now());
inner.unfrozen = Some(std::time::Instant::now());
})
}

/// Advances time.
Expand Down Expand Up @@ -164,19 +168,23 @@ cfg_test_util! {
///
/// [`sleep`]: fn@crate::time::sleep
pub async fn advance(duration: Duration) {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.advance(duration);
with_clock(|maybe_clock| {
let clock = maybe_clock.expect("time cannot be frozen from outside the Tokio runtime");
clock.advance(duration);
});

crate::task::yield_now().await;
}

/// Returns the current instant, factoring in frozen time.
pub(crate) fn now() -> Instant {
if let Some(clock) = clock() {
clock.now()
} else {
Instant::from_std(std::time::Instant::now())
}
with_clock(|maybe_clock| {
if let Some(clock) = maybe_clock {
clock.now()
} else {
Instant::from_std(std::time::Instant::now())
}
})
}

impl Clock {
Expand All @@ -186,12 +194,12 @@ cfg_test_util! {
let now = std::time::Instant::now();

let clock = Clock {
inner: Arc::new(Mutex::new(Inner {
inner: Mutex::new(Inner {
enable_pausing,
base: now,
unfrozen: Some(now),
auto_advance_inhibit_count: 0,
})),
}),
};

if start_paused {
Expand Down
6 changes: 4 additions & 2 deletions tokio/src/time/sleep.rs
Expand Up @@ -261,10 +261,11 @@ impl Sleep {

#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let clock = handle.driver().clock();
let handle = &handle.driver().time();
let time_source = handle.time_source();
let deadline_tick = time_source.deadline_to_tick(deadline);
let duration = deadline_tick.saturating_sub(time_source.now());
let duration = deadline_tick.saturating_sub(time_source.now(clock));

let location = location.expect("should have location if tracing");
let resource_span = tracing::trace_span!(
Expand Down Expand Up @@ -370,8 +371,9 @@ impl Sleep {
tracing::trace_span!("runtime.resource.async_op.poll");

let duration = {
let clock = me.entry.clock();
let time_source = me.entry.driver().time_source();
let now = time_source.now();
let now = time_source.now(clock);
let deadline_tick = time_source.deadline_to_tick(deadline);
deadline_tick.saturating_sub(now)
};
Expand Down