From 06c50259b5bab7904ab935d85ccc2a328da5d904 Mon Sep 17 00:00:00 2001 From: Timmy Xiao <34635512+tzx@users.noreply.github.com> Date: Fri, 7 Apr 2023 21:54:50 -0400 Subject: [PATCH 1/2] io: make read_to_end not grow unnecessarily Fixes: #5594 --- tokio/src/io/util/read_to_end.rs | 22 ++++++++++++++++------ tokio/src/io/util/vec_with_initialized.rs | 14 ++++++++++++-- tokio/tests/io_read_to_end.rs | 12 ++++++++++++ 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/tokio/src/io/util/read_to_end.rs b/tokio/src/io/util/read_to_end.rs index f4a564d7dd4..c43a060175b 100644 --- a/tokio/src/io/util/read_to_end.rs +++ b/tokio/src/io/util/read_to_end.rs @@ -1,5 +1,5 @@ use crate::io::util::vec_with_initialized::{into_read_buf_parts, VecU8, VecWithInitialized}; -use crate::io::AsyncRead; +use crate::io::{AsyncRead, ReadBuf}; use pin_project_lite::pin_project; use std::future::Future; @@ -68,19 +68,29 @@ fn poll_read_to_end( // of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every // time is 4,500 times (!) slower than this if the reader has a very small // amount of data to return. - buf.reserve(32); + let try_small_read = buf.reserve(32); // Get a ReadBuf into the vector. let mut read_buf = buf.get_read_buf(); + let poll_result; let filled_before = read_buf.filled().len(); - let poll_result = read.poll_read(cx, &mut read_buf); - let filled_after = read_buf.filled().len(); - let n = filled_after - filled_before; - + let filled_after; + if try_small_read { + let mut small_buf = Vec::with_capacity(32); + let mut small_read_buf = ReadBuf::new(&mut small_buf); + poll_result = read.poll_read(cx, &mut small_read_buf); + let filled = small_read_buf.filled().len(); + read_buf.put_slice(&small_buf[..filled]); + filled_after = filled_before + filled; + } else { + poll_result = read.poll_read(cx, &mut read_buf); + filled_after = read_buf.filled().len(); + }; // Update the length of the vector using the result of poll_read. let read_buf_parts = into_read_buf_parts(read_buf); buf.apply_read_buf(read_buf_parts); + let n = filled_after - filled_before; match poll_result { Poll::Pending => { diff --git a/tokio/src/io/util/vec_with_initialized.rs b/tokio/src/io/util/vec_with_initialized.rs index a9b94e39d57..952915c4c7c 100644 --- a/tokio/src/io/util/vec_with_initialized.rs +++ b/tokio/src/io/util/vec_with_initialized.rs @@ -28,6 +28,7 @@ pub(crate) struct VecWithInitialized { // The number of initialized bytes in the vector. // Always between `vec.len()` and `vec.capacity()`. num_initialized: usize, + starting_capacity: usize, } impl VecWithInitialized> { @@ -47,19 +48,28 @@ where // to its length are initialized. Self { num_initialized: vec.as_mut().len(), + starting_capacity: vec.as_ref().capacity(), vec, } } - pub(crate) fn reserve(&mut self, num_bytes: usize) { + // Returns a boolean telling the caller to try reading into a small local buffer first if true. + // Doing so would avoid overallocating when vec is filled to capacity and we reached EOF. + pub(crate) fn reserve(&mut self, num_bytes: usize) -> bool { let vec = self.vec.as_mut(); if vec.capacity() - vec.len() >= num_bytes { - return; + return false; } + + if self.starting_capacity == vec.capacity() && self.starting_capacity >= num_bytes { + return true; + } + // SAFETY: Setting num_initialized to `vec.len()` is correct as // `reserve` does not change the length of the vector. self.num_initialized = vec.len(); vec.reserve(num_bytes); + false } #[cfg(feature = "io-util")] diff --git a/tokio/tests/io_read_to_end.rs b/tokio/tests/io_read_to_end.rs index 171e6d6480e..e78adbe79b4 100644 --- a/tokio/tests/io_read_to_end.rs +++ b/tokio/tests/io_read_to_end.rs @@ -5,6 +5,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio_test::assert_ok; +use tokio_test::io::Builder; #[tokio::test] async fn read_to_end() { @@ -76,3 +77,14 @@ async fn read_to_end_uninit() { test.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf.len(), 33); } + +#[tokio::test] +async fn read_to_end_doesnt_grow_with_capacity() { + let bytes = b"imlargerthan32bytessoIcanhelpwiththetest"; + let mut mock = Builder::new().read(bytes).build(); + let mut buf = Vec::with_capacity(bytes.len()); + AsyncReadExt::read_to_end(&mut mock, &mut buf) + .await + .unwrap(); + assert_eq!(bytes.len(), buf.capacity()); +} From be2747997de4273c13dd1579663a410420df0eb7 Mon Sep 17 00:00:00 2001 From: Timmy Xiao <34635512+tzx@users.noreply.github.com> Date: Sat, 8 Apr 2023 16:58:51 -0400 Subject: [PATCH 2/2] Don't use bool in reserve and use stack allocated buffer --- tokio/src/io/util/read_to_end.rs | 55 ++++++++++++++++------- tokio/src/io/util/vec_with_initialized.rs | 21 ++++----- tokio/tests/io_read_to_end.rs | 40 +++++++++++++++-- 3 files changed, 86 insertions(+), 30 deletions(-) diff --git a/tokio/src/io/util/read_to_end.rs b/tokio/src/io/util/read_to_end.rs index c43a060175b..8edba2a1711 100644 --- a/tokio/src/io/util/read_to_end.rs +++ b/tokio/src/io/util/read_to_end.rs @@ -5,7 +5,7 @@ use pin_project_lite::pin_project; use std::future::Future; use std::io; use std::marker::PhantomPinned; -use std::mem; +use std::mem::{self, MaybeUninit}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -67,41 +67,62 @@ fn poll_read_to_end( // has 4 bytes while still making large reads if the reader does have a ton // of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every // time is 4,500 times (!) slower than this if the reader has a very small - // amount of data to return. - let try_small_read = buf.reserve(32); + // amount of data to return. When the vector is full with its starting + // capacity, we first try to read into a small buffer to see if we reached + // an EOF. This only happens when the starting capacity is >= NUM_BYTES, since + // we allocate at least NUM_BYTES each time. This avoids the unnecessary + // allocation that we attempt before reading into the vector. - // Get a ReadBuf into the vector. - let mut read_buf = buf.get_read_buf(); + const NUM_BYTES: usize = 32; + let try_small_read = buf.try_small_read_first(NUM_BYTES); + // Get a ReadBuf into the vector. + let mut read_buf; let poll_result; - let filled_before = read_buf.filled().len(); - let filled_after; - if try_small_read { - let mut small_buf = Vec::with_capacity(32); - let mut small_read_buf = ReadBuf::new(&mut small_buf); + + let n = if try_small_read { + // Read some bytes using a small read. + let mut small_buf: [MaybeUninit; NUM_BYTES] = [MaybeUninit::uninit(); NUM_BYTES]; + let mut small_read_buf = ReadBuf::uninit(&mut small_buf); poll_result = read.poll_read(cx, &mut small_read_buf); - let filled = small_read_buf.filled().len(); - read_buf.put_slice(&small_buf[..filled]); - filled_after = filled_before + filled; + let to_write = small_read_buf.filled(); + + // Ensure we have enough space to fill our vector with what we read. + read_buf = buf.get_read_buf(); + if to_write.len() > read_buf.remaining() { + buf.reserve(NUM_BYTES); + read_buf = buf.get_read_buf(); + } + read_buf.put_slice(to_write); + + to_write.len() } else { + // Ensure we have enough space for reading. + buf.reserve(NUM_BYTES); + read_buf = buf.get_read_buf(); + + // Read data directly into vector. + let filled_before = read_buf.filled().len(); poll_result = read.poll_read(cx, &mut read_buf); - filled_after = read_buf.filled().len(); + + // Compute the number of bytes read. + read_buf.filled().len() - filled_before }; + // Update the length of the vector using the result of poll_read. let read_buf_parts = into_read_buf_parts(read_buf); buf.apply_read_buf(read_buf_parts); - let n = filled_after - filled_before; match poll_result { Poll::Pending => { // In this case, nothing should have been read. However we still // update the vector in case the poll_read call initialized parts of // the vector's unused capacity. - debug_assert_eq!(filled_before, filled_after); + debug_assert_eq!(n, 0); Poll::Pending } Poll::Ready(Err(err)) => { - debug_assert_eq!(filled_before, filled_after); + debug_assert_eq!(n, 0); Poll::Ready(Err(err)) } Poll::Ready(Ok(())) => Poll::Ready(Ok(n)), diff --git a/tokio/src/io/util/vec_with_initialized.rs b/tokio/src/io/util/vec_with_initialized.rs index 952915c4c7c..f527492dcd9 100644 --- a/tokio/src/io/util/vec_with_initialized.rs +++ b/tokio/src/io/util/vec_with_initialized.rs @@ -53,23 +53,15 @@ where } } - // Returns a boolean telling the caller to try reading into a small local buffer first if true. - // Doing so would avoid overallocating when vec is filled to capacity and we reached EOF. - pub(crate) fn reserve(&mut self, num_bytes: usize) -> bool { + pub(crate) fn reserve(&mut self, num_bytes: usize) { let vec = self.vec.as_mut(); if vec.capacity() - vec.len() >= num_bytes { - return false; + return; } - - if self.starting_capacity == vec.capacity() && self.starting_capacity >= num_bytes { - return true; - } - // SAFETY: Setting num_initialized to `vec.len()` is correct as // `reserve` does not change the length of the vector. self.num_initialized = vec.len(); vec.reserve(num_bytes); - false } #[cfg(feature = "io-util")] @@ -121,6 +113,15 @@ where vec.set_len(parts.len); } } + + // Returns a boolean telling the caller to try reading into a small local buffer first if true. + // Doing so would avoid overallocating when vec is filled to capacity and we reached EOF. + pub(crate) fn try_small_read_first(&self, num_bytes: usize) -> bool { + let vec = self.vec.as_ref(); + vec.capacity() - vec.len() < num_bytes + && self.starting_capacity == vec.capacity() + && self.starting_capacity >= num_bytes + } } pub(crate) struct ReadBufParts { diff --git a/tokio/tests/io_read_to_end.rs b/tokio/tests/io_read_to_end.rs index e78adbe79b4..29b60becec4 100644 --- a/tokio/tests/io_read_to_end.rs +++ b/tokio/tests/io_read_to_end.rs @@ -80,11 +80,45 @@ async fn read_to_end_uninit() { #[tokio::test] async fn read_to_end_doesnt_grow_with_capacity() { - let bytes = b"imlargerthan32bytessoIcanhelpwiththetest"; + let arr: Vec = (0..100).collect(); + + // We only test from 32 since we allocate at least 32 bytes each time + for len in 32..100 { + let bytes = &arr[..len]; + for split in 0..len { + for cap in 0..101 { + let mut mock = if split == 0 { + Builder::new().read(bytes).build() + } else { + Builder::new() + .read(&bytes[..split]) + .read(&bytes[split..]) + .build() + }; + let mut buf = Vec::with_capacity(cap); + AsyncReadExt::read_to_end(&mut mock, &mut buf) + .await + .unwrap(); + // It has the right data. + assert_eq!(buf.as_slice(), bytes); + // Unless cap was smaller than length, then we did not reallocate. + if cap >= len { + assert_eq!(buf.capacity(), cap); + } + } + } + } +} + +#[tokio::test] +async fn read_to_end_grows_capacity_if_unfit() { + let bytes = b"the_vector_startingcap_will_be_smaller"; let mut mock = Builder::new().read(bytes).build(); - let mut buf = Vec::with_capacity(bytes.len()); + let initial_capacity = bytes.len() - 4; + let mut buf = Vec::with_capacity(initial_capacity); AsyncReadExt::read_to_end(&mut mock, &mut buf) .await .unwrap(); - assert_eq!(bytes.len(), buf.capacity()); + // *4 since it doubles when it doesn't fit and again when reaching EOF + assert_eq!(buf.capacity(), initial_capacity * 4); }