Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: make EPOLLERR awake AsyncFd::readable #4444

Closed
wants to merge 9 commits into from
86 changes: 40 additions & 46 deletions tokio/src/io/driver/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,61 +319,55 @@ impl ScheduledIo {

let ready = direction.mask() & ready;

if ready.is_empty() {
// Update the task info
let mut waiters = self.waiters.lock();
let slot = match direction {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
};
if !ready.is_empty() {
return Poll::Ready(Ok(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
}));
}

// Avoid cloning the waker if one is already stored that matches the
// current task.
match slot {
Some(existing) => {
if !existing.will_wake(cx.waker()) {
*existing = cx.waker().clone();
}
}
None => {
*slot = Some(cx.waker().clone());
// Update the task info
let mut waiters = self.waiters.lock();
let slot = match direction {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
};

// Avoid cloning the waker if one is already stored that matches the
// current task.
match slot {
Some(existing) => {
if !existing.will_wake(cx.waker()) {
*existing = cx.waker().clone();
}
}
None => {
*slot = Some(cx.waker().clone());
}
}

// Try again, in case the readiness was changed while we were
// taking the waiters lock
let curr = self.readiness.load(Acquire);
// Try again, in case the readiness was changed while we were
// taking the waiters lock
let curr = self.readiness.load(Acquire);

let ready = Ready::from_usize(READINESS.unpack(curr));
if direction != Direction::Write && ready.is_error() {
return Poll::Ready(Err(std::io::Error::new(
io::ErrorKind::Other,
"Polling error",
)));
}
let ready = Ready::from_usize(READINESS.unpack(curr));
if direction != Direction::Write && ready.is_error() {
return Poll::Ready(Err(std::io::Error::new(
io::ErrorKind::Other,
"Polling error",
)));
}

let ready = direction.mask() & ready;
if waiters.is_shutdown {
// TODO: why does this return a `ReadyEvent`? Why
// not make it return an error? It would have to be a
// custom error though...
Poll::Ready(Ok(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: direction.mask(),
}))
} else if ready.is_empty() {
Poll::Pending
} else {
Poll::Ready(Ok(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
}))
}
} else {
let ready = direction.mask() & ready;
if waiters.is_shutdown || !ready.is_empty() {
// TODO: why return a `ReadyEvent` in shutdown? Why not make it
// return an error? It would have to be a custom error though...
Poll::Ready(Ok(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
ready: direction.mask(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah this change is not correct. I thought I was simplifying the if/else if/else but it's not a correct transformation. Will fix

}))
} else {
Poll::Pending
}
}

Expand Down