Skip to content

Commit

Permalink
time: clean up implementation (#6517)
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed May 1, 2024
1 parent d33fdd8 commit 28439e2
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 134 deletions.
24 changes: 7 additions & 17 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
/// The largest safe integer to use for ticks.
///
/// This value should be updated if any other signal values are added above.
pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = u64::MAX - 2;
pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = STATE_MIN_VALUE - 1;

/// This structure holds the current shared state of the timer - its scheduled
/// time (if registered), or otherwise the result of the timer completing, as
Expand Down Expand Up @@ -187,18 +187,14 @@ impl StateCell {
break Err(cur_state);
}

match self.state.compare_exchange(
match self.state.compare_exchange_weak(
cur_state,
STATE_PENDING_FIRE,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
break Ok(());
}
Err(actual_state) => {
cur_state = actual_state;
}
Ok(_) => break Ok(()),
Err(actual_state) => cur_state = actual_state,
}
}
}
Expand Down Expand Up @@ -266,12 +262,8 @@ impl StateCell {
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
return Ok(());
}
Err(true_prior) => {
prior = true_prior;
}
Ok(_) => return Ok(()),
Err(true_prior) => prior = true_prior,
}
}
}
Expand Down Expand Up @@ -564,9 +556,7 @@ impl TimerEntry {
self.as_mut().reset(deadline, true);
}

let this = unsafe { self.get_unchecked_mut() };

this.inner().state.poll(cx.waker())
self.inner().state.poll(cx.waker())
}

pub(crate) fn driver(&self) -> &super::Handle {
Expand Down
22 changes: 7 additions & 15 deletions tokio/src/runtime/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use crate::loom::sync::Mutex;
use crate::runtime::driver::{self, IoHandle, IoStack};
use crate::time::error::Error;
use crate::time::{Clock, Duration};
use crate::util::WakeList;

use std::fmt;
use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
use std::{num::NonZeroU64, ptr::NonNull};

/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout].
///
Expand Down Expand Up @@ -253,8 +254,7 @@ impl Handle {
}

pub(self) fn process_at_time(&self, mut now: u64) {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;
let mut waker_list = WakeList::new();

let mut lock = self.inner.lock();

Expand All @@ -273,19 +273,13 @@ impl Handle {

// SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
waker_list[waker_idx] = Some(waker);
waker_list.push(waker);

waker_idx += 1;

if waker_idx == waker_list.len() {
if !waker_list.can_push() {
// Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped.
drop(lock);

for waker in waker_list.iter_mut() {
waker.take().unwrap().wake();
}

waker_idx = 0;
waker_list.wake_all();

lock = self.inner.lock();
}
Expand All @@ -299,9 +293,7 @@ impl Handle {

drop(lock);

for waker in &mut waker_list[0..waker_idx] {
waker.take().unwrap().wake();
}
waker_list.wake_all();
}

/// Removes a registered timer from the driver.
Expand Down
4 changes: 1 addition & 3 deletions tokio/src/runtime/time/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ impl TimeSource {

pub(crate) fn instant_to_tick(&self, t: Instant) -> u64 {
// round up
let dur: Duration = t
.checked_duration_since(self.start_time)
.unwrap_or_else(|| Duration::from_secs(0));
let dur: Duration = t.saturating_duration_since(self.start_time);
let ms = dur.as_millis();

ms.try_into().unwrap_or(MAX_SAFE_MILLIS_DURATION)
Expand Down
90 changes: 4 additions & 86 deletions tokio/src/runtime/time/wheel/level.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::runtime::time::{EntryList, TimerHandle, TimerShared};

use std::{fmt, ptr::NonNull};
use std::{array, fmt, ptr::NonNull};

/// Wheel for a single level in the timer. This wheel contains 64 slots.
pub(crate) struct Level {
Expand Down Expand Up @@ -39,89 +39,10 @@ const LEVEL_MULT: usize = 64;

impl Level {
pub(crate) fn new(level: usize) -> Level {
// A value has to be Copy in order to use syntax like:
// let stack = Stack::default();
// ...
// slots: [stack; 64],
//
// Alternatively, since Stack is Default one can
// use syntax like:
// let slots: [Stack; 64] = Default::default();
//
// However, that is only supported for arrays of size
// 32 or fewer. So in our case we have to explicitly
// invoke the constructor for each array element.
let ctor = EntryList::default;

Level {
level,
occupied: 0,
slot: [
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
],
slot: array::from_fn(|_| EntryList::default()),
}
}

Expand All @@ -130,10 +51,7 @@ impl Level {
pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
// Use the `occupied` bit field to get the index of the next slot that
// needs to be processed.
let slot = match self.next_occupied_slot(now) {
Some(slot) => slot,
None => return None,
};
let slot = self.next_occupied_slot(now)?;

// From the slot index, calculate the `Instant` at which it needs to be
// processed. This value *must* be in the future with respect to `now`.
Expand Down Expand Up @@ -196,7 +114,7 @@ impl Level {
let now_slot = (now / slot_range(self.level)) as usize;
let occupied = self.occupied.rotate_right(now_slot as u32);
let zeros = occupied.trailing_zeros() as usize;
let slot = (zeros + now_slot) % 64;
let slot = (zeros + now_slot) % LEVEL_MULT;

Some(slot)
}
Expand Down
22 changes: 9 additions & 13 deletions tokio/src/runtime/time/wheel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod level;
pub(crate) use self::level::Expiration;
use self::level::Level;

use std::ptr::NonNull;
use std::{array, ptr::NonNull};

use super::EntryList;

Expand Down Expand Up @@ -35,7 +35,7 @@ pub(crate) struct Wheel {
/// * ~ 4 min slots / ~ 4 hr range
/// * ~ 4 hr slots / ~ 12 day range
/// * ~ 12 day slots / ~ 2 yr range
levels: Vec<Level>,
levels: Box<[Level; NUM_LEVELS]>,

/// Entries queued for firing
pending: EntryList,
Expand All @@ -52,11 +52,9 @@ pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
impl Wheel {
/// Creates a new timing wheel.
pub(crate) fn new() -> Wheel {
let levels = (0..NUM_LEVELS).map(Level::new).collect();

Wheel {
elapsed: 0,
levels,
levels: Box::new(array::from_fn(Level::new)),
pending: EntryList::new(),
}
}
Expand Down Expand Up @@ -130,7 +128,6 @@ impl Wheel {
);

let level = self.level_for(when);

self.levels[level].remove_entry(item);
}
}
Expand Down Expand Up @@ -180,11 +177,11 @@ impl Wheel {
}

// Check all levels
for level in 0..NUM_LEVELS {
if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
for (level_num, level) in self.levels.iter().enumerate() {
if let Some(expiration) = level.next_expiration(self.elapsed) {
// There cannot be any expirations at a higher level that happen
// before this one.
debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
debug_assert!(self.no_expirations_before(level_num + 1, expiration.deadline));

return Some(expiration);
}
Expand All @@ -203,8 +200,8 @@ impl Wheel {
fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
let mut res = true;

for l2 in start_level..NUM_LEVELS {
if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
for level in &self.levels[start_level..] {
if let Some(e2) = level.next_expiration(self.elapsed) {
if e2.deadline < before {
res = false;
}
Expand Down Expand Up @@ -267,7 +264,6 @@ impl Wheel {
}

/// Obtains the list of entries that need processing for the given expiration.
///
fn take_entries(&mut self, expiration: &Expiration) -> EntryList {
self.levels[expiration.level].take_slot(expiration.slot)
}
Expand All @@ -292,7 +288,7 @@ fn level_for(elapsed: u64, when: u64) -> usize {
let leading_zeros = masked.leading_zeros() as usize;
let significant = 63 - leading_zeros;

significant / 6
significant / NUM_LEVELS
}

#[cfg(all(test, not(loom)))]
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub(crate) mod once_cell;
// rt and signal use `Notify`, which requires `WakeList`.
feature = "rt",
feature = "signal",
// time driver uses `WakeList` in `Handle::process_at_time`.
feature = "time",
))]
mod wake_list;
#[cfg(any(
Expand All @@ -28,6 +30,7 @@ mod wake_list;
feature = "fs",
feature = "rt",
feature = "signal",
feature = "time",
))]
pub(crate) use wake_list::WakeList;

Expand Down

0 comments on commit 28439e2

Please sign in to comment.