Skip to content

Commit

Permalink
codec: expose backpressure_boundary in Framed API (#5124)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusader-mike committed Oct 29, 2022
1 parent 1ab80ba commit cbbf81b
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 12 deletions.
10 changes: 10 additions & 0 deletions tokio-util/src/codec/framed.rs
Expand Up @@ -253,6 +253,16 @@ impl<T, U> Framed<T, U> {
&mut self.inner.state.write.buffer
}

/// Returns backpressure boundary
pub fn backpressure_boundary(&self) -> usize {
self.inner.state.write.backpressure_boundary
}

/// Updates backpressure boundary
pub fn set_backpressure_boundary(&mut self, boundary: usize) {
self.inner.state.write.backpressure_boundary = boundary;
}

/// Consumes the `Framed`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
Expand Down
12 changes: 8 additions & 4 deletions tokio-util/src/codec/framed_impl.rs
Expand Up @@ -25,7 +25,6 @@ pin_project! {
}

const INITIAL_CAPACITY: usize = 8 * 1024;
const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;

#[derive(Debug)]
pub(crate) struct ReadFrame {
Expand All @@ -37,6 +36,7 @@ pub(crate) struct ReadFrame {

pub(crate) struct WriteFrame {
pub(crate) buffer: BytesMut,
pub(crate) backpressure_boundary: usize,
}

#[derive(Default)]
Expand All @@ -60,6 +60,7 @@ impl Default for WriteFrame {
fn default() -> Self {
Self {
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
backpressure_boundary: INITIAL_CAPACITY,
}
}
}
Expand Down Expand Up @@ -87,7 +88,10 @@ impl From<BytesMut> for WriteFrame {
buffer.reserve(INITIAL_CAPACITY - size);
}

Self { buffer }
Self {
buffer,
backpressure_boundary: INITIAL_CAPACITY,
}
}
}

Expand Down Expand Up @@ -256,7 +260,7 @@ where
type Error = U::Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.state.borrow().buffer.len() >= BACKPRESSURE_BOUNDARY {
if self.state.borrow().buffer.len() >= self.state.borrow().backpressure_boundary {
self.as_mut().poll_flush(cx)
} else {
Poll::Ready(Ok(()))
Expand All @@ -277,7 +281,7 @@ where
let mut pinned = self.project();

while !pinned.state.borrow_mut().buffer.is_empty() {
let WriteFrame { buffer } = pinned.state.borrow_mut();
let WriteFrame { buffer, .. } = pinned.state.borrow_mut();
trace!(remaining = buffer.len(), "writing;");

let n = ready!(poll_write_buf(pinned.inner.as_mut(), cx, buffer))?;
Expand Down
10 changes: 10 additions & 0 deletions tokio-util/src/codec/framed_write.rs
Expand Up @@ -123,6 +123,16 @@ impl<T, E> FramedWrite<T, E> {
pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
&mut self.inner.state.buffer
}

/// Returns backpressure boundary
pub fn backpressure_boundary(&self) -> usize {
self.inner.state.backpressure_boundary
}

/// Updates backpressure boundary
pub fn set_backpressure_boundary(&mut self, boundary: usize) {
self.inner.state.backpressure_boundary = boundary;
}
}

// This impl just defers to the underlying FramedImpl
Expand Down
17 changes: 9 additions & 8 deletions tokio-util/tests/framed_write.rs
Expand Up @@ -109,12 +109,12 @@ fn write_hits_backpressure() {
const ITER: usize = 2 * 1024;

let mut mock = mock! {
// Block the `ITER`th write
// Block the `ITER*2`th write
Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")),
Ok(b"".to_vec()),
};

for i in 0..=ITER {
for i in 0..=ITER * 2 {
let mut b = BytesMut::with_capacity(4);
b.put_u32(i as u32);

Expand All @@ -133,14 +133,15 @@ fn write_hits_backpressure() {
// Push a new chunk
mock.calls.push_back(Ok(b[..].to_vec()));
}
// 1 'wouldblock', 4 * 2KB buffers, 1 b-byte buffer
assert_eq!(mock.calls.len(), 6);
// 1 'wouldblock', 8 * 2KB buffers, 1 b-byte buffer
assert_eq!(mock.calls.len(), 10);

let mut task = task::spawn(());
let mut framed = FramedWrite::new(mock, U32Encoder);
framed.set_backpressure_boundary(ITER * 8);
task.enter(|cx, _| {
// Send 8KB. This fills up FramedWrite2 buffer
for i in 0..ITER {
// Send 16KB. This fills up FramedWrite buffer
for i in 0..ITER * 2 {
assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
assert!(pin!(framed).start_send(i as u32).is_ok());
}
Expand All @@ -150,11 +151,11 @@ fn write_hits_backpressure() {
assert!(pin!(framed).poll_ready(cx).is_pending());

// We poll again, forcing another flush, which this time succeeds
// The whole 8KB buffer is flushed
// The whole 16KB buffer is flushed
assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());

// Send more data. This matches the final message expected by the mock
assert!(pin!(framed).start_send(ITER as u32).is_ok());
assert!(pin!(framed).start_send((ITER * 2) as u32).is_ok());

// Flush the rest of the buffer
assert!(assert_ready!(pin!(framed).poll_flush(cx)).is_ok());
Expand Down

0 comments on commit cbbf81b

Please sign in to comment.