New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fairness problem with semaphore acquire #4612
Comments
Thanks for reporting this. This is because |
Fixing it would involve adding this line: let coop = ready!(crate::coop::poll_proceed(cx)); then, if the acquire succeeds, you should call this: coop.made_progress(); |
I'll take this one if you don't mind @scullionw |
I'm stuck so i thought i would get some opinions on this (hope i'm not too wrong): What i understood from reading Reducing tail latencies with automatic cooperative task yielding is that each task has a budget. I was taking a look at pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = trace::async_op(
|| self.ll_sem.acquire(1),
self.resource_span.clone(),
"Semaphore::acquire_owned",
"poll",
true,
);
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let inner = self.ll_sem.acquire(1);
inner.await?;
Ok(OwnedSemaphorePermit {
sem: self,
permits: 1,
})
} The impl Future for Acquire<'_> {
type Output = Result<(), AcquireError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// First, ensure the current task has enough budget to proceed.
#[cfg(all(tokio_unstable, feature = "tracing"))]
let coop = ready!(trace_poll_op!(
"poll_acquire",
crate::coop::poll_proceed(cx),
));
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let coop = ready!(crate::coop::poll_proceed(cx));
...
}
} The tokio::join! docs say that the async expressions are evaluated in the same task. (which means they share the task budget?) What i think is happeningSince both async expressions run in the same task and they share the task budget, I changed the order of the futures passed to use std::{sync::Arc, time::Duration};
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let permits = Arc::new(Semaphore::new(10));
// HERE \/
tokio::join!(poor_little_task(), non_cooperative_task(permits));
}
async fn non_cooperative_task(permits: Arc<Semaphore>) {
loop {
let permit = permits.clone().acquire_owned().await.unwrap();
// uncommenting the following makes it work
// tokio::time::sleep(Duration::from_millis(1)).await;
}
}
async fn poor_little_task() {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
panic!("Hello!")
}
} I think this makes sense after looking at the expanded #![feature(prelude_import)]
#[prelude_import]
use std::prelude::rust_2021::*;
#[macro_use]
extern crate std;
use std::{sync::Arc, time::Duration};
use tokio::sync::Semaphore;
fn main() {
let body = async {
let permits = Arc::new(Semaphore::new(10));
{
use ::tokio::macros::support::{maybe_done, poll_fn, Future, Pin};
use ::tokio::macros::support::Poll::{Ready, Pending};
let mut futures = (
maybe_done(poor_little_task()),
maybe_done(non_cooperative_task(permits)),
);
poll_fn(move |cx| {
let mut is_pending = false;
let (fut, ..) = &mut futures;
let mut fut = unsafe { Pin::new_unchecked(fut) };
if fut.poll(cx).is_pending() {
is_pending = true;
}
let (_, fut, ..) = &mut futures;
let mut fut = unsafe { Pin::new_unchecked(fut) };
if fut.poll(cx).is_pending() {
is_pending = true;
}
if is_pending {
Pending
} else {
Ready((
{
let (fut, ..) = &mut futures;
let mut fut = unsafe { Pin::new_unchecked(fut) };
fut.take_output().expect("expected completed future")
},
{
let (_, fut, ..) = &mut futures;
let mut fut = unsafe { Pin::new_unchecked(fut) };
fut.take_output().expect("expected completed future")
},
))
}
})
.await
};
};
#[allow(clippy::expect_used)]
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed building the Runtime")
.block_on(body);
}
async fn non_cooperative_task(permits: Arc<Semaphore>) {
loop {
let permit = permits.clone().acquire_owned().await.unwrap();
}
}
async fn poor_little_task() {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
::core::panicking::panic_fmt(::core::fmt::Arguments::new_v1(&["Hello!"], &[]))
}
} If we go back to the original order that futures were passed to use std::{sync::Arc, time::Duration};
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let permits = Arc::new(Semaphore::new(10));
tokio::join!(non_cooperative_task(permits), poor_little_task());
}
async fn non_cooperative_task(permits: Arc<Semaphore>) {
loop {
let permit = permits.clone().acquire_owned().await.unwrap();
// uncommenting the following makes it work
// tokio::time::sleep(Duration::from_millis(1)).await;
}
}
async fn poor_little_task() {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
panic!("Hello!")
}
} |
Ah yes, you are right. So the issue is really with |
I got it working #4624 What do you think? |
Well, I am thinking that it would be better to just have the |
I don't get it 🤔 What do you mean?
Just out of curiosity, does this create any problems? |
I mean, the second time the As for whether it creates any issues, well, as far as I can tell with your implementation, this loop loop {
tokio::join!(sem.acquire());
} would never run out of budget because the |
Doesn't that mean that if i have two futures that consume resources that are always ready, the futures would stop making progress concurrently? Or we don't care about this case? async fn always_ready(permits: Arc<Semaphore>) -> {
loop {
let _permit = permits.clone().acquire_owned().await.unwrap();
}
}
let permits = Arc::new(Semaphore::new(10));
tokio::join!(/* call it A */ always_ready(Arc::clone(&permits)), /* call it B */ always_ready(permits)); poll |
Your dual |
Updated my PR (#4624). I added 4 bytes to the size of the future generated by If everything is ok, maybe we can use |
└── tokio v1.17.0
└── tokio-macros v1.7.0 (proc-macro)
M1 MBP
21.4.0 Darwin Kernel Version 21.4.0: Fri Mar 18 00:46:32 PDT 2022; root:xnu-8020.101.4~15/RELEASE_ARM64_T6000 arm64
Description
I am having fairness problems in tokio. In the following code, the second task never runs.
Both tasks
await
, but only the first task runs.I would understand if the first task was a hot loop, or had blocking code, since I am using tokio::join (which runs concurrently, not in parallel) and not tokio::spawn. But like I said, I am awaiting in both tasks.
The text was updated successfully, but these errors were encountered: