Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of notify_one_last_in method #6520

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
109 changes: 89 additions & 20 deletions tokio/src/sync/notify.rs
Expand Up @@ -223,7 +223,9 @@ struct Waiter {
/// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
waker: UnsafeCell<Option<Waker>>,

/// Notification for this waiter.
/// Notification for this waiter. Uses 2 bits to store if and how was
/// notified, 2 bits for storing if it was woken up using FIFO or LIFO, and
/// the rest of it is unused.
/// * if it's `None`, then `waker` is protected by the `waiters` lock.
/// * if it's `Some`, then `waker` is exclusively owned by the
/// enclosing `Waiter` and can be accessed without locking.
Expand Down Expand Up @@ -261,6 +263,19 @@ const NOTIFICATION_ONE: usize = 1;
// Notification type used by `notify_waiters`.
const NOTIFICATION_ALL: usize = 2;

const NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT: usize = 2;
const NOTIFICATION_TYPE_MASK: usize = (1 << NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT) - 1;
const NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK: usize = !NOTIFICATION_TYPE_MASK;

// Unspecified wakeup order
const NOTIFICATION_NOTIFY_ONE_STRATEGY_NONE: usize = 0;

// Fifo (default) wakeup order
const NOTIFICATION_NOTIFY_ONE_STRATEGY_FIFO: usize = 1;

// Lifo wakeup order
const NOTIFICATION_NOTIFY_ONE_STRATEGY_LIFO: usize = 2;

/// Notification for a `Waiter`.
/// This struct is equivalent to `Option<Notification>`, but uses
/// `AtomicUsize` inside for atomic operations.
Expand All @@ -269,30 +284,48 @@ struct AtomicNotification(AtomicUsize);

impl AtomicNotification {
fn none() -> Self {
AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
AtomicNotification(AtomicUsize::new(0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this isn't a const anymore?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there was since bot attributes, if there were a notification and the strategy, but once Ive changed the interfaces we can go back to what was originally, so this should be "fixed" now.

}

/// Store-release a notification.
/// This method should be called exactly once.
fn store_release(&self, notification: Notification) {
self.0.store(notification as usize, Release);
fn store_release(&self, notification: Notification, strategy: Option<NotifyOneStrategy>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need Option here or could callers default to a strategy?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should no longer apply after the latest refactor.

let data: usize = match strategy {
None => notification as usize & NOTIFICATION_TYPE_MASK,
Some(strategy) => {
(((strategy as usize) << NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT)
& NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK)
| (notification as usize & NOTIFICATION_TYPE_MASK)
}
};
self.0.store(data, Release);
}

fn load(&self, ordering: Ordering) -> Option<Notification> {
match self.0.load(ordering) {
fn load(&self, ordering: Ordering) -> (Option<Notification>, Option<NotifyOneStrategy>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a time when this will return None for Notification but will return Some for NotifyOneStrategy? I don't think there is. Perhaps, Notification the enum should take the strategy as an argument:

enum Notification {
    One(Strategy),
    All,
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let me change this towards your suggestion.

let data = self.0.load(ordering);
let notification = match data & NOTIFICATION_TYPE_MASK {
NOTIFICATION_NONE => None,
NOTIFICATION_ONE => Some(Notification::One),
NOTIFICATION_ALL => Some(Notification::All),
_ => unreachable!(),
}
};
let strategy = match (data & NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK)
>> NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT
{
NOTIFICATION_NOTIFY_ONE_STRATEGY_NONE => None,
NOTIFICATION_NOTIFY_ONE_STRATEGY_FIFO => Some(NotifyOneStrategy::Fifo),
NOTIFICATION_NOTIFY_ONE_STRATEGY_LIFO => Some(NotifyOneStrategy::Lifo),
_ => unreachable!(),
};
(notification, strategy)
}

/// Clears the notification.
/// This method is used by a `Notified` future to consume the
/// notification. It uses relaxed ordering and should be only
/// used once the atomic notification is no longer shared.
fn clear(&self) {
self.0.store(NOTIFICATION_NONE, Relaxed);
self.0.store(0, Relaxed);
}
}

Expand All @@ -303,6 +336,13 @@ enum Notification {
All = NOTIFICATION_ALL,
}

#[derive(Debug, PartialEq, Eq)]
#[repr(usize)]
enum NotifyOneStrategy {
Fifo = NOTIFICATION_NOTIFY_ONE_STRATEGY_FIFO,
Lifo = NOTIFICATION_NOTIFY_ONE_STRATEGY_LIFO,
}

/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
/// and gates the access to it on `notify.waiters` mutex. It also empties
/// the list on drop.
Expand Down Expand Up @@ -349,7 +389,7 @@ impl Drop for NotifyWaitersList<'_> {
while let Some(waiter) = self.list.pop_back() {
// Safety: we never make mutable references to waiters.
let waiter = unsafe { waiter.as_ref() };
waiter.notification.store_release(Notification::All);
waiter.notification.store_release(Notification::All, None);
}
}
}
Expand Down Expand Up @@ -521,7 +561,7 @@ impl Notify {
}
}

/// Notifies a waiting task.
/// Notifies the first waiting task.
///
/// If a task is currently waiting, that task is notified. Otherwise, a
/// permit is stored in this `Notify` value and the **next** call to
Expand Down Expand Up @@ -558,6 +598,21 @@ impl Notify {
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "notify"))]
pub fn notify_one(&self) {
self.notify_with_strategy(NotifyOneStrategy::Fifo);
}

/// Notifies the last waiting task.
///
/// This function behaves identically to `notify_one` but using a
/// LIFO algorithm to notify waiters from the queue, if there are any.
pfreixes marked this conversation as resolved.
Show resolved Hide resolved
///
/// Check the `notify_one` documentation for more info and
pfreixes marked this conversation as resolved.
Show resolved Hide resolved
/// examples.
pub fn notify_one_lifo(&self) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against calling this notify_one_lifo. As a consideration, do you think notify_last_in or notify_one_last_in could be better options?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notify_one_last_in sounds better, users will easily understand the semantics of the method. Ill change this one.

self.notify_with_strategy(NotifyOneStrategy::Lifo);
}

fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
// Load the current state
let mut curr = self.state.load(SeqCst);

Expand Down Expand Up @@ -585,7 +640,7 @@ impl Notify {
// transition out of WAITING while the lock is held.
curr = self.state.load(SeqCst);

if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
drop(waiters);
waker.wake();
}
Expand Down Expand Up @@ -673,7 +728,7 @@ impl Notify {
}

// This waiter is unlinked and will not be shared ever again, release it.
waiter.notification.store_release(Notification::All);
waiter.notification.store_release(Notification::All, None);
}
None => {
break 'outer;
Expand Down Expand Up @@ -708,7 +763,12 @@ impl Default for Notify {
impl UnwindSafe for Notify {}
impl RefUnwindSafe for Notify {}

fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
fn notify_locked(
waiters: &mut WaitList,
state: &AtomicUsize,
curr: usize,
strategy: NotifyOneStrategy,
) -> Option<Waker> {
match get_state(curr) {
EMPTY | NOTIFIED => {
let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
Expand All @@ -728,8 +788,11 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
// concurrently change as holding the lock is required to
// transition **out** of `WAITING`.
//
// Get a pending waiter
let waiter = waiters.pop_back().unwrap();
// Get a pending waiter using one of the available dequeue strategies.
let waiter = match strategy {
NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
};

// Safety: we never make mutable references to waiters.
let waiter = unsafe { waiter.as_ref() };
Expand All @@ -738,7 +801,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };

// This waiter is unlinked and will not be shared ever again, release it.
waiter.notification.store_release(Notification::One);
waiter
.notification
.store_release(Notification::One, Some(strategy));

if waiters.is_empty() {
// As this the **final** waiter in the list, the state
Expand Down Expand Up @@ -998,7 +1063,8 @@ impl Notified<'_> {
ready!(crate::trace::trace_leaf(&mut ctx));
}

if waiter.notification.load(Acquire).is_some() {
let (notification, _) = waiter.notification.load(Acquire);
if notification.is_some() {
// Safety: waiter is already unlinked and will not be shared again,
// so we have an exclusive access to `waker`.
drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
Expand All @@ -1018,7 +1084,8 @@ impl Notified<'_> {
// We hold the lock and notifications are set only with the lock held,
// so this can be relaxed, because the happens-before relationship is
// established through the mutex.
if waiter.notification.load(Relaxed).is_some() {
let (notification, _) = waiter.notification.load(Acquire);
if notification.is_some() {
// Safety: waiter is already unlinked and will not be shared again,
// so we have an exclusive access to `waker`.
old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
Expand Down Expand Up @@ -1120,7 +1187,7 @@ impl Drop for Notified<'_> {

// We hold the lock, so this field is not concurrently accessed by
// `notify_*` functions and we can use the relaxed ordering.
let notification = waiter.notification.load(Relaxed);
let (notification, strategy) = waiter.notification.load(Relaxed);

// remove the entry from the list (if not already removed)
//
Expand All @@ -1138,7 +1205,9 @@ impl Drop for Notified<'_> {
// the notification was triggered via `notify_one`, it must be sent
// to the next waiter.
if notification == Some(Notification::One) {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
if let Some(waker) =
notify_locked(&mut waiters, &notify.state, notify_state, strategy.unwrap())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can drop the Option around the strategy. Then we don't need an unwrap here. We can ignore the strategy when we are waking everything.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you prefer to just pass or return the NOTIFICATION_NOTIFY_ONE_STRATEGY_NONE? If so we can also avoid this two indirection using the const and the enum and having somthing like

 enum NotifyOneStrategy {
    None = 0
    Fifo = 1,
    Lifo = 2,
}

And use the NotifyOneStrategy as a parameter for the store and as a return value for the load.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented above, but it looks like the strategy is an argument to Notification. Something like:

enum Notification {
    One(Strategy),
    All,
}

You can implement the conversion of the enum to size using a match statement, the compiler will optimize it anyway.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeps, changing a bit this one.

{
drop(waiters);
waker.wake();
}
Expand Down
20 changes: 20 additions & 0 deletions tokio/src/util/linked_list.rs
Expand Up @@ -144,6 +144,26 @@ impl<L: Link> LinkedList<L, L::Target> {
}
}

/// Removes the first element from a list and returns it, or None if it is
/// empty.
pub(crate) fn pop_front(&mut self) -> Option<L::Handle> {
unsafe {
let head = self.head?;
self.head = L::pointers(head).as_ref().get_next();

if let Some(next) = L::pointers(head).as_ref().get_next() {
L::pointers(next).as_mut().set_prev(None);
pfreixes marked this conversation as resolved.
Show resolved Hide resolved
} else {
self.tail = None;
}

L::pointers(head).as_mut().set_prev(None);
L::pointers(head).as_mut().set_next(None);

Some(L::from_raw(head))
}
}

/// Removes the last element from a list and returns it, or None if it is
/// empty.
pub(crate) fn pop_back(&mut self) -> Option<L::Handle> {
Expand Down
75 changes: 75 additions & 0 deletions tokio/tests/sync_notify.rs
Expand Up @@ -21,6 +21,38 @@ fn notify_notified_one() {
assert_ready!(notified.poll());
}

#[test]
fn notify_multi_notified_one_fifo() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

// add two waiters into the queue
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());

// should wakeup the first one
notify.notify_one();
assert_ready!(notified1.poll());
assert_pending!(notified2.poll());
}

#[test]
fn notify_multi_notified_one_lifo() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

// add two waiters into the queue
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());

// should wakeup the last one
notify.notify_one_lifo();
assert_pending!(notified1.poll());
assert_ready!(notified2.poll());
}

#[test]
fn notified_one_notify() {
let notify = Notify::new();
Expand Down Expand Up @@ -105,6 +137,49 @@ fn notified_multi_notify_drop_one() {
assert_ready!(notified2.poll());
}

#[test]
fn notified_multi_notify_drop_one_fifo() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
let mut notified3 = spawn(async { notify.notified().await });

// add waiters by order of poll execution
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());
assert_pending!(notified3.poll());

// by default fifo
notify.notify_one();

drop(notified1);

// next waiter should be the one to be to woken up
assert_ready!(notified2.poll());
assert_pending!(notified3.poll());
}

#[test]
fn notified_multi_notify_drop_one_lifo() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
let mut notified3 = spawn(async { notify.notified().await });

// add waiters by order of poll execution
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());
assert_pending!(notified3.poll());

notify.notify_one_lifo();

drop(notified1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you drop notified3 to trigger the forwarding logic?

Suggested change
drop(notified1);
drop(notified3);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, fixed!


// latest waiter added should be the one to woken up
assert_ready!(notified3.poll());
assert_pending!(notified2.poll());
}

#[test]
fn notify_in_drop_after_wake() {
use futures::task::ArcWake;
Expand Down