Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: remove Arc from Clock #5434

Merged
merged 7 commits into from Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 5 additions & 8 deletions tokio/src/runtime/driver.rs
Expand Up @@ -45,8 +45,7 @@ impl Driver {

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, clock.clone());
let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, &clock);

Ok((
Self { inner: time_driver },
Expand Down Expand Up @@ -111,10 +110,8 @@ impl Handle {
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}

cfg_test_util! {
pub(crate) fn clock(&self) -> &Clock {
&self.clock
}
pub(crate) fn clock(&self) -> &Clock {
&self.clock
}
}
}
Expand Down Expand Up @@ -289,7 +286,7 @@ cfg_time! {
fn create_time_driver(
enable: bool,
io_stack: IoStack,
clock: Clock,
clock: &Clock,
) -> (TimeDriver, TimeHandle) {
if enable {
let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock);
Expand Down Expand Up @@ -337,7 +334,7 @@ cfg_not_time! {
fn create_time_driver(
_enable: bool,
io_stack: IoStack,
_clock: Clock,
_clock: &Clock,
) -> (TimeDriver, TimeHandle) {
(io_stack, ())
}
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/time/entry.rs
Expand Up @@ -579,6 +579,11 @@ impl TimerEntry {
pub(crate) fn driver(&self) -> &super::Handle {
self.driver.driver().time()
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(crate) fn clock(&self) -> &super::Clock {
self.driver.driver().clock()
}
}

impl TimerHandle {
Expand Down
16 changes: 9 additions & 7 deletions tokio/src/runtime/time/mod.rs
Expand Up @@ -125,7 +125,7 @@ impl Driver {
/// thread and `time_source` to get the current time and convert to ticks.
///
/// Specifying the source of time is useful when testing.
pub(crate) fn new(park: IoStack, clock: Clock) -> (Driver, Handle) {
pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) {
let time_source = TimeSource::new(clock);

let handle = Handle {
Expand Down Expand Up @@ -186,7 +186,7 @@ impl Driver {

match next_wake {
Some(when) => {
let now = handle.time_source.now();
let now = handle.time_source.now(rt_handle.clock());
// Note that we effectively round up to 1ms here - this avoids
// very short-duration microsecond-resolution sleeps that the OS
// might treat as zero-length.
Expand Down Expand Up @@ -214,13 +214,13 @@ impl Driver {
}

// Process pending timers after waking up
handle.process();
handle.process(rt_handle.clock());
}

cfg_test_util! {
fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.time();
let clock = &handle.time_source.clock;
let clock = rt_handle.clock();

if clock.can_auto_advance() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
Expand All @@ -231,7 +231,9 @@ impl Driver {
// advance the clock.
if !handle.did_wake() {
// Simulate advancing time
clock.advance(duration);
if let Err(msg) = clock.advance(duration) {
panic!("{}", msg);
}
}
} else {
self.park.park_timeout(rt_handle, duration);
Expand All @@ -248,8 +250,8 @@ impl Driver {

impl Handle {
/// Runs timer related logic, and returns the next wakeup time
pub(self) fn process(&self) {
let now = self.time_source().now();
pub(self) fn process(&self, clock: &Clock) {
let now = self.time_source().now(clock);

self.process_at_time(now)
}
Expand Down
8 changes: 3 additions & 5 deletions tokio/src/runtime/time/source.rs
Expand Up @@ -5,15 +5,13 @@ use std::convert::TryInto;
/// A structure which handles conversion from Instants to u64 timestamps.
#[derive(Debug)]
pub(crate) struct TimeSource {
pub(crate) clock: Clock,
start_time: Instant,
}

impl TimeSource {
pub(crate) fn new(clock: Clock) -> Self {
pub(crate) fn new(clock: &Clock) -> Self {
Self {
start_time: clock.now(),
clock,
}
}

Expand All @@ -36,7 +34,7 @@ impl TimeSource {
Duration::from_millis(t)
}

pub(crate) fn now(&self) -> u64 {
self.instant_to_tick(self.clock.now())
pub(crate) fn now(&self, clock: &Clock) -> u64 {
self.instant_to_tick(clock.now())
}
}
15 changes: 9 additions & 6 deletions tokio/src/runtime/time/tests/mod.rs
Expand Up @@ -62,12 +62,13 @@ fn single_timer() {

thread::yield_now();

let handle = handle.inner.driver().time();
let time = handle.inner.driver().time();
let clock = handle.inner.driver().clock();

// This may or may not return Some (depending on how it races with the
// thread). If it does return None, however, the timer should complete
// synchronously.
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
time.process_at_time(time.time_source().now(clock) + 2_000_000_000);

jh.join().unwrap();
})
Expand Down Expand Up @@ -97,10 +98,11 @@ fn drop_timer() {

thread::yield_now();

let handle = handle.inner.driver().time();
let time = handle.inner.driver().time();
let clock = handle.inner.driver().clock();

// advance 2s in the future.
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
time.process_at_time(time.time_source().now(clock) + 2_000_000_000);

jh.join().unwrap();
})
Expand Down Expand Up @@ -132,10 +134,11 @@ fn change_waker() {

thread::yield_now();

let handle = handle.inner.driver().time();
let time = handle.inner.driver().time();
let clock = handle.inner.driver().clock();

// advance 2s
handle.process_at_time(handle.time_source().now() + 2_000_000_000);
time.process_at_time(time.time_source().now(clock) + 2_000_000_000);

jh.join().unwrap();
})
Expand Down
103 changes: 69 additions & 34 deletions tokio/src/time/clock.rs
Expand Up @@ -29,30 +29,40 @@ cfg_not_test_util! {

cfg_test_util! {
use crate::time::{Duration, Instant};
use crate::loom::sync::{Arc, Mutex};
use crate::loom::sync::Mutex;

cfg_rt! {
fn clock() -> Option<Clock> {
#[track_caller]
fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> Result<R, &'static str>) -> R {
use crate::runtime::Handle;

match Handle::try_current() {
Ok(handle) => Some(handle.inner.driver().clock().clone()),
Err(ref e) if e.is_missing_context() => None,
let res = match Handle::try_current() {
Ok(handle) => f(Some(handle.inner.driver().clock())),
Err(ref e) if e.is_missing_context() => f(None),
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
};

match res {
Ok(ret) => ret,
Err(msg) => panic!("{}", msg),
}
}
}

cfg_not_rt! {
fn clock() -> Option<Clock> {
None
#[track_caller]
fn with_clock<R>(f: impl FnOnce(Option<&Clock>) -> Result<R, &'static str>) -> R {
match f(None) {
Ok(ret) => ret,
Err(msg) => panic!("{}", msg),
}
}
}

/// A handle to a source of time.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) struct Clock {
inner: Arc<Mutex<Inner>>,
inner: Mutex<Inner>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -107,8 +117,12 @@ cfg_test_util! {
/// [`advance`]: crate::time::advance
#[track_caller]
pub fn pause() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.pause();
with_clock(|maybe_clock| {
match maybe_clock {
Some(clock) => clock.pause(),
None => Err("time cannot be frozen from outside the Tokio runtime"),
}
})
}

/// Resumes time.
Expand All @@ -122,14 +136,21 @@ cfg_test_util! {
/// runtime.
#[track_caller]
pub fn resume() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
let mut inner = clock.inner.lock();
with_clock(|maybe_clock| {
let clock = match maybe_clock {
Some(clock) => clock,
None => return Err("time cannot be frozen from outside the Tokio runtime"),
};

if inner.unfrozen.is_some() {
panic!("time is not frozen");
}
let mut inner = clock.inner.lock();

inner.unfrozen = Some(std::time::Instant::now());
if inner.unfrozen.is_some() {
return Err("time is not frozen");
}

inner.unfrozen = Some(std::time::Instant::now());
Ok(())
})
}

/// Advances time.
Expand Down Expand Up @@ -164,19 +185,27 @@ cfg_test_util! {
///
/// [`sleep`]: fn@crate::time::sleep
pub async fn advance(duration: Duration) {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.advance(duration);
with_clock(|maybe_clock| {
let clock = match maybe_clock {
Some(clock) => clock,
None => return Err("time cannot be frozen from outside the Tokio runtime"),
};

clock.advance(duration)
});

crate::task::yield_now().await;
}

/// Returns the current instant, factoring in frozen time.
pub(crate) fn now() -> Instant {
if let Some(clock) = clock() {
clock.now()
} else {
Instant::from_std(std::time::Instant::now())
}
with_clock(|maybe_clock| {
Ok(if let Some(clock) = maybe_clock {
clock.now()
} else {
Instant::from_std(std::time::Instant::now())
})
})
}

impl Clock {
Expand All @@ -186,34 +215,40 @@ cfg_test_util! {
let now = std::time::Instant::now();

let clock = Clock {
inner: Arc::new(Mutex::new(Inner {
inner: Mutex::new(Inner {
enable_pausing,
base: now,
unfrozen: Some(now),
auto_advance_inhibit_count: 0,
})),
}),
};

if start_paused {
clock.pause();
if let Err(msg) = clock.pause() {
panic!("{}", msg);
}
}

clock
}

#[track_caller]
pub(crate) fn pause(&self) {
pub(crate) fn pause(&self) -> Result<(), &'static str> {
let mut inner = self.inner.lock();

if !inner.enable_pausing {
drop(inner); // avoid poisoning the lock
panic!("`time::pause()` requires the `current_thread` Tokio runtime. \
return Err("`time::pause()` requires the `current_thread` Tokio runtime. \
This is the default Runtime used by `#[tokio::test].");
}

let elapsed = inner.unfrozen.as_ref().expect("time is already frozen").elapsed();
let elapsed = match inner.unfrozen.as_ref() {
Some(v) => v.elapsed(),
None => return Err("time is already frozen")
};
inner.base += elapsed;
inner.unfrozen = None;

Ok(())
}

/// Temporarily stop auto-advancing the clock (see `tokio::time::pause`).
Expand All @@ -232,15 +267,15 @@ cfg_test_util! {
inner.unfrozen.is_none() && inner.auto_advance_inhibit_count == 0
}

#[track_caller]
pub(crate) fn advance(&self, duration: Duration) {
pub(crate) fn advance(&self, duration: Duration) -> Result<(), &'static str> {
let mut inner = self.inner.lock();

if inner.unfrozen.is_some() {
panic!("time is not frozen");
return Err("time is not frozen");
}

inner.base += duration;
Ok(())
}

pub(crate) fn now(&self) -> Instant {
Expand Down
6 changes: 4 additions & 2 deletions tokio/src/time/sleep.rs
Expand Up @@ -261,10 +261,11 @@ impl Sleep {

#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let clock = handle.driver().clock();
let handle = &handle.driver().time();
let time_source = handle.time_source();
let deadline_tick = time_source.deadline_to_tick(deadline);
let duration = deadline_tick.saturating_sub(time_source.now());
let duration = deadline_tick.saturating_sub(time_source.now(clock));

let location = location.expect("should have location if tracing");
let resource_span = tracing::trace_span!(
Expand Down Expand Up @@ -370,8 +371,9 @@ impl Sleep {
tracing::trace_span!("runtime.resource.async_op.poll");

let duration = {
let clock = me.entry.clock();
let time_source = me.entry.driver().time_source();
let now = time_source.now();
let now = time_source.now(clock);
let deadline_tick = time_source.deadline_to_tick(deadline);
deadline_tick.saturating_sub(now)
};
Expand Down