Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of notify_last method #6520

Merged
merged 19 commits into from May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 88 additions & 14 deletions tokio/src/sync/notify.rs
Expand Up @@ -223,7 +223,9 @@ struct Waiter {
/// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
waker: UnsafeCell<Option<Waker>>,

/// Notification for this waiter.
/// Notification for this waiter. Uses 2 bits to store if and how was
/// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and
/// the rest of it is unused.
/// * if it's `None`, then `waker` is protected by the `waiters` lock.
/// * if it's `Some`, then `waker` is exclusively owned by the
/// enclosing `Waiter` and can be accessed without locking.
Expand Down Expand Up @@ -261,6 +263,16 @@ const NOTIFICATION_ONE: usize = 1;
// Notification type used by `notify_waiters`.
const NOTIFICATION_ALL: usize = 2;

const NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT: usize = 2;
const NOTIFICATION_TYPE_MASK: usize = (1 << NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT) - 1;
const NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK: usize = !NOTIFICATION_TYPE_MASK;

// Fifo (default) wakeup order
const NOTIFICATION_NOTIFY_ONE_STRATEGY_FIFO: usize = 0;

// Lifo wakeup order
const NOTIFICATION_NOTIFY_ONE_STRATEGY_LIFO: usize = 1;

/// Notification for a `Waiter`.
/// This struct is equivalent to `Option<Notification>`, but uses
/// `AtomicUsize` inside for atomic operations.
Expand All @@ -275,13 +287,41 @@ impl AtomicNotification {
/// Store-release a notification.
/// This method should be called exactly once.
fn store_release(&self, notification: Notification) {
self.0.store(notification as usize, Release);
let data: usize = match notification {
Notification::All => NOTIFICATION_ALL & NOTIFICATION_TYPE_MASK,
Notification::One(NotifyOneStrategy::Fifo) => {
(((NOTIFICATION_NOTIFY_ONE_STRATEGY_FIFO)
<< NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT)
& NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK)
| (NOTIFICATION_ONE & NOTIFICATION_TYPE_MASK)
}
Notification::One(NotifyOneStrategy::Lifo) => {
(((NOTIFICATION_NOTIFY_ONE_STRATEGY_LIFO)
<< NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT)
& NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK)
| (NOTIFICATION_ONE & NOTIFICATION_TYPE_MASK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty hard to read. Can we just make three constants equal to 0b00, 0b01, 0b10 or whatever the values are?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, Ive changed for using just constants which makes definitely more readable the code

}
};
self.0.store(data, Release);
}

fn load(&self, ordering: Ordering) -> Option<Notification> {
match self.0.load(ordering) {
let data = self.0.load(ordering);
match data & NOTIFICATION_TYPE_MASK {
NOTIFICATION_NONE => None,
NOTIFICATION_ONE => Some(Notification::One),
NOTIFICATION_ONE => {
match (data & NOTIFICATION_NOTIFY_ONE_STRATEGY_MASK)
>> NOTIFICATION_NOTIFY_ONE_STRATEGY_SHIFT
{
NOTIFICATION_NOTIFY_ONE_STRATEGY_FIFO => {
Some(Notification::One(NotifyOneStrategy::Fifo))
}
NOTIFICATION_NOTIFY_ONE_STRATEGY_LIFO => {
Some(Notification::One(NotifyOneStrategy::Lifo))
}
_ => unreachable!(),
}
}
NOTIFICATION_ALL => Some(Notification::All),
_ => unreachable!(),
}
Expand All @@ -296,11 +336,18 @@ impl AtomicNotification {
}
}

#[derive(Debug, PartialEq, Eq)]
#[repr(usize)]
enum NotifyOneStrategy {
Fifo,
Lifo,
}

#[derive(Debug, PartialEq, Eq)]
#[repr(usize)]
enum Notification {
One = NOTIFICATION_ONE,
All = NOTIFICATION_ALL,
One(NotifyOneStrategy),
All,
}

/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
Expand Down Expand Up @@ -521,7 +568,7 @@ impl Notify {
}
}

/// Notifies a waiting task.
/// Notifies the first waiting task.
///
/// If a task is currently waiting, that task is notified. Otherwise, a
/// permit is stored in this `Notify` value and the **next** call to
Expand Down Expand Up @@ -558,6 +605,21 @@ impl Notify {
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "notify"))]
pub fn notify_one(&self) {
self.notify_with_strategy(NotifyOneStrategy::Fifo);
}

/// Notifies the last waiting task.
///
/// This function behaves identically to `notify_one` but using a
/// LIFO algorithm to notify waiters from the queue, if there are any.
pfreixes marked this conversation as resolved.
Show resolved Hide resolved
///
/// Check the `notify_one` documentation for more info and
pfreixes marked this conversation as resolved.
Show resolved Hide resolved
/// examples.
pub fn notify_one_last_in(&self) {
self.notify_with_strategy(NotifyOneStrategy::Lifo);
}

fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
// Load the current state
let mut curr = self.state.load(SeqCst);

Expand Down Expand Up @@ -585,7 +647,7 @@ impl Notify {
// transition out of WAITING while the lock is held.
curr = self.state.load(SeqCst);

if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
drop(waiters);
waker.wake();
}
Expand Down Expand Up @@ -708,7 +770,12 @@ impl Default for Notify {
impl UnwindSafe for Notify {}
impl RefUnwindSafe for Notify {}

fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
fn notify_locked(
waiters: &mut WaitList,
state: &AtomicUsize,
curr: usize,
strategy: NotifyOneStrategy,
) -> Option<Waker> {
match get_state(curr) {
EMPTY | NOTIFIED => {
let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
Expand All @@ -728,8 +795,11 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
// concurrently change as holding the lock is required to
// transition **out** of `WAITING`.
//
// Get a pending waiter
let waiter = waiters.pop_back().unwrap();
// Get a pending waiter using one of the available dequeue strategies.
let waiter = match strategy {
NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
};

// Safety: we never make mutable references to waiters.
let waiter = unsafe { waiter.as_ref() };
Expand All @@ -738,7 +808,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };

// This waiter is unlinked and will not be shared ever again, release it.
waiter.notification.store_release(Notification::One);
waiter
.notification
.store_release(Notification::One(strategy));

if waiters.is_empty() {
// As this the **final** waiter in the list, the state
Expand Down Expand Up @@ -1137,8 +1209,10 @@ impl Drop for Notified<'_> {
// See if the node was notified but not received. In this case, if
// the notification was triggered via `notify_one`, it must be sent
// to the next waiter.
if notification == Some(Notification::One) {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
if let Some(Notification::One(strategy)) = notification {
if let Some(waker) =
notify_locked(&mut waiters, &notify.state, notify_state, strategy)
{
drop(waiters);
waker.wake();
}
Expand Down
20 changes: 20 additions & 0 deletions tokio/src/util/linked_list.rs
Expand Up @@ -144,6 +144,26 @@ impl<L: Link> LinkedList<L, L::Target> {
}
}

/// Removes the first element from a list and returns it, or None if it is
/// empty.
pub(crate) fn pop_front(&mut self) -> Option<L::Handle> {
unsafe {
let head = self.head?;
self.head = L::pointers(head).as_ref().get_next();

if let Some(next) = L::pointers(head).as_ref().get_next() {
L::pointers(next).as_mut().set_prev(None);
pfreixes marked this conversation as resolved.
Show resolved Hide resolved
} else {
self.tail = None;
}

L::pointers(head).as_mut().set_prev(None);
L::pointers(head).as_mut().set_next(None);

Some(L::from_raw(head))
}
}

/// Removes the last element from a list and returns it, or None if it is
/// empty.
pub(crate) fn pop_back(&mut self) -> Option<L::Handle> {
Expand Down
75 changes: 75 additions & 0 deletions tokio/tests/sync_notify.rs
Expand Up @@ -21,6 +21,38 @@ fn notify_notified_one() {
assert_ready!(notified.poll());
}

#[test]
fn notify_multi_notified_one_first_in() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

// add two waiters into the queue
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());

// should wakeup the first one
notify.notify_one();
assert_ready!(notified1.poll());
assert_pending!(notified2.poll());
}

#[test]
fn notify_multi_notified_one_last_in() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

// add two waiters into the queue
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());

// should wakeup the last one
notify.notify_one_last_in();
assert_pending!(notified1.poll());
assert_ready!(notified2.poll());
}

#[test]
fn notified_one_notify() {
let notify = Notify::new();
Expand Down Expand Up @@ -105,6 +137,49 @@ fn notified_multi_notify_drop_one() {
assert_ready!(notified2.poll());
}

#[test]
fn notified_multi_notify_drop_one_first_in() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
let mut notified3 = spawn(async { notify.notified().await });

// add waiters by order of poll execution
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());
assert_pending!(notified3.poll());

// by default fifo
notify.notify_one();

drop(notified1);

// next waiter should be the one to be to woken up
assert_ready!(notified2.poll());
assert_pending!(notified3.poll());
}

#[test]
fn notified_multi_notify_drop_one_last_in() {
let notify = Notify::new();
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });
let mut notified3 = spawn(async { notify.notified().await });

// add waiters by order of poll execution
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());
assert_pending!(notified3.poll());

notify.notify_one_last_in();

drop(notified1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you drop notified3 to trigger the forwarding logic?

Suggested change
drop(notified1);
drop(notified3);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, fixed!


// latest waiter added should be the one to woken up
assert_ready!(notified3.poll());
assert_pending!(notified2.poll());
}

#[test]
fn notify_in_drop_after_wake() {
use futures::task::ArcWake;
Expand Down