Skip to content

Commit

Permalink
Format code
Browse files Browse the repository at this point in the history
  • Loading branch information
pfreixes committed Apr 27, 2024
1 parent c3bbef6 commit 0fd084f
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions tokio/src/sync/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ struct Waiter {
waker: UnsafeCell<Option<Waker>>,

/// Notification for this waiter. Uses 2 bits to store if and how was
/// notified, 2 bit for storing if was woken up using FIFO or LIFO and
/// the rest of it are unused.
/// notified, 2 bits for storing if it was woken up using FIFO or LIFO, and
/// the rest of it is unused.
/// * if it's `None`, then `waker` is protected by the `waiters` lock.
/// * if it's `Some`, then `waker` is exclusively owned by the
/// enclosing `Waiter` and can be accessed without locking.
Expand Down Expand Up @@ -292,7 +292,11 @@ impl AtomicNotification {
fn store_release(&self, notification: Notification, strategy: Option<NotifyOneStrategy>) {
let data: usize = match strategy {
None => notification as usize & NOTIFICATION_TYPE_MASK,
Some(strategy) => (((strategy as usize) << NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT) & NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK) | (notification as usize & NOTIFICATION_TYPE_MASK)
Some(strategy) => {
(((strategy as usize) << NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT)
& NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK)
| (notification as usize & NOTIFICATION_TYPE_MASK)
}
};
self.0.store(data, Release);
}
Expand All @@ -305,7 +309,9 @@ impl AtomicNotification {
NOTIFICATION_ALL => Some(Notification::All),
_ => unreachable!(),
};
let strategy = match (data & NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK) >> NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT {
let strategy = match (data & NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK)
>> NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT
{
NOTIFICATION_NOTIFY_ONE_STRATEGY_NONE => None,
NOTIFICATION_NOTIFY_ONE_STRATEGY_FIFO => Some(NotifyOneStrategy::Fifo),
NOTIFICATION_NOTIFY_ONE_STRATEGY_LIFO => Some(NotifyOneStrategy::Lifo),
Expand Down Expand Up @@ -489,7 +495,7 @@ impl Notify {
pub const fn const_new() -> Notify {
Notify {
state: AtomicUsize::new(0),
waiters: Mutex::const_new(LinkedList::new())
waiters: Mutex::const_new(LinkedList::new()),
}
}

Expand Down Expand Up @@ -597,12 +603,11 @@ impl Notify {

/// Notifies the last waiting task.
///
/// This function behaves identically as `notify_one` but using a
/// LIFO algorithm for dequeing the waiters, if there are.
/// This function behaves identically to `notify_one` but using a
/// LIFO algorithm for dequeuing the waiters, if there are any.
///
/// Check the `notify_one` documentation for more info and
/// examples.
///
pub fn notify_one_lifo(&self) {
self.notify_with_strategy(NotifyOneStrategy::Lifo);
}
Expand All @@ -611,7 +616,6 @@ impl Notify {
// Load the current state
let mut curr = self.state.load(SeqCst);


// If the state is `EMPTY`, transition to `NOTIFIED` and return.
while let EMPTY | NOTIFIED = get_state(curr) {
// The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
Expand All @@ -622,9 +626,7 @@ impl Notify {

match res {
// No waiters, no further work to do
Ok(_) => {
return
}
Ok(_) => return,
Err(actual) => {
curr = actual;
}
Expand All @@ -637,6 +639,7 @@ impl Notify {
// The state must be reloaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
curr = self.state.load(SeqCst);

if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
drop(waiters);
waker.wake();
Expand Down Expand Up @@ -760,7 +763,12 @@ impl Default for Notify {
impl UnwindSafe for Notify {}
impl RefUnwindSafe for Notify {}

fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize, strategy: NotifyOneStrategy) -> Option<Waker> {
fn notify_locked(
waiters: &mut WaitList,
state: &AtomicUsize,
curr: usize,
strategy: NotifyOneStrategy,
) -> Option<Waker> {
match get_state(curr) {
EMPTY | NOTIFIED => {
let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
Expand All @@ -780,10 +788,10 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize, strat
// concurrently change as holding the lock is required to
// transition **out** of `WAITING`.
//
// Get a pending waiter using one or the other available strategies.
// Get a pending waiter using one of the available dequeue strategies.
let waiter = match strategy {
NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
};

// Safety: we never make mutable references to waiters.
Expand All @@ -793,7 +801,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize, strat
let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };

// This waiter is unlinked and will not be shared ever again, release it.
waiter.notification.store_release(Notification::One, Some(strategy));
waiter
.notification
.store_release(Notification::One, Some(strategy));

if waiters.is_empty() {
// As this the **final** waiter in the list, the state
Expand Down Expand Up @@ -1195,7 +1205,9 @@ impl Drop for Notified<'_> {
// the notification was triggered via `notify_one`, it must be sent
// to the next waiter.
if notification == Some(Notification::One) {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state, strategy.unwrap()) {
if let Some(waker) =
notify_locked(&mut waiters, &notify.state, notify_state, strategy.unwrap())
{
drop(waiters);
waker.wake();
}
Expand Down

0 comments on commit 0fd084f

Please sign in to comment.