Skip to content

Commit

Permalink
Don't use bool in reserve and use stack allocated buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
tzx committed Apr 8, 2023
1 parent 43001d2 commit 226737e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 23 deletions.
37 changes: 24 additions & 13 deletions tokio/src/io/util/read_to_end.rs
Expand Up @@ -5,7 +5,7 @@ use pin_project_lite::pin_project;
use std::future::Future;
use std::io;
use std::marker::PhantomPinned;
use std::mem;
use std::mem::{self, MaybeUninit};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -67,26 +67,37 @@ fn poll_read_to_end<V: VecU8, R: AsyncRead + ?Sized>(
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
let try_small_read = buf.reserve(32);
// amount of data to return. When the vector is full with its starting
// capacity, we first try to read into a small buffer to see if we reached
// an EOF. This avoids the unnecessary allocation that we attempt before
// reading into the vector.

const NUM_BYTES: usize = 32;
let try_small_read = buf.try_small_read_first(NUM_BYTES);

// Get a ReadBuf into the vector.
let mut read_buf = buf.get_read_buf();

let poll_result;
let filled_before = read_buf.filled().len();
let filled_after;
if try_small_read {
let mut small_buf = Vec::with_capacity(32);
let mut small_read_buf = ReadBuf::new(&mut small_buf);
let poll_result;

let filled_after = if try_small_read {
let mut small_buf: [MaybeUninit<u8>; NUM_BYTES] = [MaybeUninit::uninit(); NUM_BYTES];
let mut small_read_buf = ReadBuf::uninit(&mut small_buf);
poll_result = read.poll_read(cx, &mut small_read_buf);
let filled = small_read_buf.filled().len();
read_buf.put_slice(&small_buf[..filled]);
filled_after = filled_before + filled;
let filled = small_read_buf.filled();
if filled.len() > 0 {
buf.reserve(NUM_BYTES);
read_buf = buf.get_read_buf();
read_buf.put_slice(filled);
}
filled_before + filled.len()
} else {
buf.reserve(NUM_BYTES);
read_buf = buf.get_read_buf();
poll_result = read.poll_read(cx, &mut read_buf);
filled_after = read_buf.filled().len();
read_buf.filled().len()
};

// Update the length of the vector using the result of poll_read.
let read_buf_parts = into_read_buf_parts(read_buf);
buf.apply_read_buf(read_buf_parts);
Expand Down
21 changes: 11 additions & 10 deletions tokio/src/io/util/vec_with_initialized.rs
Expand Up @@ -53,23 +53,15 @@ where
}
}

// Returns a boolean telling the caller to try reading into a small local buffer first if true.
// Doing so would avoid overallocating when vec is filled to capacity and we reached EOF.
pub(crate) fn reserve(&mut self, num_bytes: usize) -> bool {
pub(crate) fn reserve(&mut self, num_bytes: usize) {
let vec = self.vec.as_mut();
if vec.capacity() - vec.len() >= num_bytes {
return false;
return;
}

if self.starting_capacity == vec.capacity() && self.starting_capacity >= num_bytes {
return true;
}

// SAFETY: Setting num_initialized to `vec.len()` is correct as
// `reserve` does not change the length of the vector.
self.num_initialized = vec.len();
vec.reserve(num_bytes);
false
}

#[cfg(feature = "io-util")]
Expand Down Expand Up @@ -121,6 +113,15 @@ where
vec.set_len(parts.len);
}
}

// Returns a boolean telling the caller to try reading into a small local buffer first if true.
// Doing so would avoid overallocating when vec is filled to capacity and we reached EOF.
pub(crate) fn try_small_read_first(&self, num_bytes: usize) -> bool {
let vec = self.vec.as_ref();
vec.capacity() - vec.len() < num_bytes
&& self.starting_capacity == vec.capacity()
&& self.starting_capacity >= num_bytes
}
}

pub(crate) struct ReadBufParts {
Expand Down
13 changes: 13 additions & 0 deletions tokio/tests/io_read_to_end.rs
Expand Up @@ -88,3 +88,16 @@ async fn read_to_end_doesnt_grow_with_capacity() {
.unwrap();
assert_eq!(bytes.len(), buf.capacity());
}

#[tokio::test]
async fn read_to_end_grows_capacity_if_unfit() {
let bytes = b"the_vector_startingcap_will_be_smaller";
let mut mock = Builder::new().read(bytes).build();
let initial_capacity = bytes.len() - 4;
let mut buf = Vec::with_capacity(initial_capacity);
AsyncReadExt::read_to_end(&mut mock, &mut buf)
.await
.unwrap();
// *4 since it doubles when it doesn't fit and again when reaching EOF
assert_eq!(buf.capacity(), initial_capacity * 4);
}

0 comments on commit 226737e

Please sign in to comment.