Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time: small implementation cleanups #6517

Merged
merged 11 commits into from
May 1, 2024
Merged
24 changes: 7 additions & 17 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
/// The largest safe integer to use for ticks.
///
/// This value should be updated if any other signal values are added above.
pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = u64::MAX - 2;
pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = STATE_MIN_VALUE - 1;

/// This structure holds the current shared state of the timer - its scheduled
/// time (if registered), or otherwise the result of the timer completing, as
Expand Down Expand Up @@ -187,18 +187,14 @@ impl StateCell {
break Err(cur_state);
}

match self.state.compare_exchange(
match self.state.compare_exchange_weak(
cur_state,
STATE_PENDING_FIRE,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
break Ok(());
}
Err(actual_state) => {
cur_state = actual_state;
}
Ok(_) => break Ok(()),
Err(actual_state) => cur_state = actual_state,
}
}
}
Expand Down Expand Up @@ -266,12 +262,8 @@ impl StateCell {
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
return Ok(());
}
Err(true_prior) => {
prior = true_prior;
}
Ok(_) => return Ok(()),
Err(true_prior) => prior = true_prior,
}
}
}
Expand Down Expand Up @@ -564,9 +556,7 @@ impl TimerEntry {
self.as_mut().reset(deadline, true);
}

let this = unsafe { self.get_unchecked_mut() };

this.inner().state.poll(cx.waker())
self.inner().state.poll(cx.waker())
}

pub(crate) fn driver(&self) -> &super::Handle {
Expand Down
22 changes: 7 additions & 15 deletions tokio/src/runtime/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use crate::loom::sync::Mutex;
use crate::runtime::driver::{self, IoHandle, IoStack};
use crate::time::error::Error;
use crate::time::{Clock, Duration};
use crate::util::WakeList;

use std::fmt;
use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
use std::{num::NonZeroU64, ptr::NonNull};

/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout].
///
Expand Down Expand Up @@ -253,8 +254,7 @@ impl Handle {
}

pub(self) fn process_at_time(&self, mut now: u64) {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;
let mut waker_list = WakeList::new();

let mut lock = self.inner.lock();

Expand All @@ -273,19 +273,13 @@ impl Handle {

// SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
waker_list[waker_idx] = Some(waker);
waker_list.push(waker);

waker_idx += 1;

if waker_idx == waker_list.len() {
if !waker_list.can_push() {
// Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped.
drop(lock);

for waker in waker_list.iter_mut() {
waker.take().unwrap().wake();
}

waker_idx = 0;
waker_list.wake_all();

lock = self.inner.lock();
}
Expand All @@ -299,9 +293,7 @@ impl Handle {

drop(lock);

for waker in &mut waker_list[0..waker_idx] {
waker.take().unwrap().wake();
}
waker_list.wake_all();
}

/// Removes a registered timer from the driver.
Expand Down
4 changes: 1 addition & 3 deletions tokio/src/runtime/time/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ impl TimeSource {

pub(crate) fn instant_to_tick(&self, t: Instant) -> u64 {
// round up
let dur: Duration = t
.checked_duration_since(self.start_time)
.unwrap_or_else(|| Duration::from_secs(0));
let dur: Duration = t.saturating_duration_since(self.start_time);
let ms = dur.as_millis();

ms.try_into().unwrap_or(MAX_SAFE_MILLIS_DURATION)
Expand Down
90 changes: 4 additions & 86 deletions tokio/src/runtime/time/wheel/level.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::runtime::time::{EntryList, TimerHandle, TimerShared};

use std::{fmt, ptr::NonNull};
use std::{array, fmt, ptr::NonNull};

/// Wheel for a single level in the timer. This wheel contains 64 slots.
pub(crate) struct Level {
Expand Down Expand Up @@ -39,89 +39,10 @@ const LEVEL_MULT: usize = 64;

impl Level {
pub(crate) fn new(level: usize) -> Level {
// A value has to be Copy in order to use syntax like:
// let stack = Stack::default();
// ...
// slots: [stack; 64],
//
// Alternatively, since Stack is Default one can
// use syntax like:
// let slots: [Stack; 64] = Default::default();
//
// However, that is only supported for arrays of size
// 32 or fewer. So in our case we have to explicitly
// invoke the constructor for each array element.
let ctor = EntryList::default;

Level {
level,
occupied: 0,
slot: [
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
ctor(),
],
slot: array::from_fn(|_| EntryList::default()),
}
}

Expand All @@ -130,10 +51,7 @@ impl Level {
pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
// Use the `occupied` bit field to get the index of the next slot that
// needs to be processed.
let slot = match self.next_occupied_slot(now) {
Some(slot) => slot,
None => return None,
};
let slot = self.next_occupied_slot(now)?;

// From the slot index, calculate the `Instant` at which it needs to be
// processed. This value *must* be in the future with respect to `now`.
Expand Down Expand Up @@ -196,7 +114,7 @@ impl Level {
let now_slot = (now / slot_range(self.level)) as usize;
let occupied = self.occupied.rotate_right(now_slot as u32);
let zeros = occupied.trailing_zeros() as usize;
let slot = (zeros + now_slot) % 64;
let slot = (zeros + now_slot) % LEVEL_MULT;

Some(slot)
}
Expand Down
22 changes: 9 additions & 13 deletions tokio/src/runtime/time/wheel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod level;
pub(crate) use self::level::Expiration;
use self::level::Level;

use std::ptr::NonNull;
use std::{array, ptr::NonNull};

use super::EntryList;

Expand Down Expand Up @@ -35,7 +35,7 @@ pub(crate) struct Wheel {
/// * ~ 4 min slots / ~ 4 hr range
/// * ~ 4 hr slots / ~ 12 day range
/// * ~ 12 day slots / ~ 2 yr range
levels: Vec<Level>,
levels: Box<[Level; NUM_LEVELS]>,

/// Entries queued for firing
pending: EntryList,
Expand All @@ -52,11 +52,9 @@ pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
impl Wheel {
/// Creates a new timing wheel.
pub(crate) fn new() -> Wheel {
let levels = (0..NUM_LEVELS).map(Level::new).collect();

Wheel {
elapsed: 0,
levels,
levels: Box::new(array::from_fn(Level::new)),
pending: EntryList::new(),
}
}
Expand Down Expand Up @@ -130,7 +128,6 @@ impl Wheel {
);

let level = self.level_for(when);

self.levels[level].remove_entry(item);
}
}
Expand Down Expand Up @@ -180,11 +177,11 @@ impl Wheel {
}

// Check all levels
for level in 0..NUM_LEVELS {
if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
for (level_num, level) in self.levels.iter().enumerate() {
if let Some(expiration) = level.next_expiration(self.elapsed) {
// There cannot be any expirations at a higher level that happen
// before this one.
debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
debug_assert!(self.no_expirations_before(level_num + 1, expiration.deadline));

return Some(expiration);
}
Expand All @@ -203,8 +200,8 @@ impl Wheel {
fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
let mut res = true;

for l2 in start_level..NUM_LEVELS {
if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
for level in &self.levels[start_level..] {
if let Some(e2) = level.next_expiration(self.elapsed) {
if e2.deadline < before {
res = false;
}
Expand Down Expand Up @@ -267,7 +264,6 @@ impl Wheel {
}

/// Obtains the list of entries that need processing for the given expiration.
///
fn take_entries(&mut self, expiration: &Expiration) -> EntryList {
self.levels[expiration.level].take_slot(expiration.slot)
}
Expand All @@ -292,7 +288,7 @@ fn level_for(elapsed: u64, when: u64) -> usize {
let leading_zeros = masked.leading_zeros() as usize;
let significant = 63 - leading_zeros;

significant / 6
significant / NUM_LEVELS
}

#[cfg(all(test, not(loom)))]
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub(crate) mod once_cell;
// rt and signal use `Notify`, which requires `WakeList`.
feature = "rt",
feature = "signal",
// time driver uses `WakeList` in `Handle::process_at_time`.
feature = "time",
))]
mod wake_list;
#[cfg(any(
Expand All @@ -28,6 +30,7 @@ mod wake_list;
feature = "fs",
feature = "rt",
feature = "signal",
feature = "time",
))]
pub(crate) use wake_list::WakeList;

Expand Down