diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index c9393650c20..dbc34592e3a 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -115,6 +115,7 @@ impl Registration { // Uses the poll path, requiring the caller to ensure mutual exclusion for // correctness. Only the last task to call this function is notified. + #[cfg(not(all(target_arch = "wasm32", target_os = "wasi")))] pub(crate) fn poll_read_io( &self, cx: &mut Context<'_>, diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 25cece62764..c928a31fc57 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -153,16 +153,32 @@ feature! { { use std::io::Read; - let n = ready!(self.registration.poll_read_io(cx, || { + loop { + let evt = ready!(self.registration.poll_read_ready(cx))?; + let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]); - self.io.as_ref().unwrap().read(b) - }))?; - - // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the - // buffer. - buf.assume_init(n); - buf.advance(n); - Poll::Ready(Ok(())) + let len = b.len(); + + match self.io.as_ref().unwrap().read(b) { + Ok(n) => { + // if we read a partially full buffer, this is sufficient on unix to show + // that the socket buffer has been drained + if n > 0 || (!cfg!(windows) && n < len) { + self.registration.clear_readiness(evt); + } + + // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the + // buffer. + buf.assume_init(n); + buf.advance(n); + return Poll::Ready(Ok(())); + }, + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + self.registration.clear_readiness(evt); + } + Err(e) => return Poll::Ready(Err(e)), + } + } } pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll>