Skip to content

Commit d312dcc

Browse files
authoredJun 25, 2024··
fix(tonic): flush accumulated ready messages when status received (#1756)
#1423 introduced logic to buffer multiple ready messages in order to amortize the cost of sends to the underlying transport. This also introduced a change in behavior for tonic in the following scenario: A stream of ready messages less than the yield threshold is trailed by a status. Previously the ready messages would all have been yielded from the stream and sent, followed by the status. After the change was introduced the status is yielded from the stream and sent but the accumulated ready messages in the buffer are never sent out. This change adjusts the logic to restore the previous behavior while still retaining the amoritization benefits. Namely it flushes the accumulated ready messages prior to yielding the status ensuring they are sent out from the stream in the order they are read.
1 parent f679dcf commit d312dcc

File tree

2 files changed

+61
-1
lines changed

2 files changed

+61
-1
lines changed
 

‎tests/integration_tests/tests/status.rs

+49
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,52 @@ async fn status_from_server_stream_with_source() {
194194
let source = error.source().unwrap();
195195
source.downcast_ref::<tonic::transport::Error>().unwrap();
196196
}
197+
198+
#[tokio::test]
199+
async fn message_and_then_status_from_server_stream() {
200+
integration_tests::trace_init();
201+
202+
struct Svc;
203+
204+
#[tonic::async_trait]
205+
impl test_stream_server::TestStream for Svc {
206+
type StreamCallStream = Stream<OutputStream>;
207+
208+
async fn stream_call(
209+
&self,
210+
_: Request<InputStream>,
211+
) -> Result<Response<Self::StreamCallStream>, Status> {
212+
let s = tokio_stream::iter(vec![
213+
Ok(OutputStream {}),
214+
Err::<OutputStream, _>(Status::unavailable("foo")),
215+
]);
216+
Ok(Response::new(Box::pin(s) as Self::StreamCallStream))
217+
}
218+
}
219+
220+
let svc = test_stream_server::TestStreamServer::new(Svc);
221+
222+
tokio::spawn(async move {
223+
Server::builder()
224+
.add_service(svc)
225+
.serve("127.0.0.1:1340".parse().unwrap())
226+
.await
227+
.unwrap();
228+
});
229+
230+
tokio::time::sleep(Duration::from_millis(100)).await;
231+
232+
let mut client = test_stream_client::TestStreamClient::connect("http://127.0.0.1:1340")
233+
.await
234+
.unwrap();
235+
236+
let mut stream = client
237+
.stream_call(InputStream {})
238+
.await
239+
.unwrap()
240+
.into_inner();
241+
242+
assert_eq!(stream.message().await.unwrap(), Some(OutputStream {}));
243+
assert_eq!(stream.message().await.unwrap_err().message(), "foo");
244+
assert_eq!(stream.message().await.unwrap(), None);
245+
}

‎tonic/src/codec/encode.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ where
7474
max_message_size: Option<usize>,
7575
buf: BytesMut,
7676
uncompression_buf: BytesMut,
77+
error: Option<Status>,
7778
}
7879

7980
impl<T, U> EncodedBytes<T, U>
@@ -112,6 +113,7 @@ where
112113
max_message_size,
113114
buf,
114115
uncompression_buf,
116+
error: None,
115117
}
116118
}
117119
}
@@ -131,9 +133,14 @@ where
131133
max_message_size,
132134
buf,
133135
uncompression_buf,
136+
error,
134137
} = self.project();
135138
let buffer_settings = encoder.buffer_settings();
136139

140+
if let Some(status) = error.take() {
141+
return Poll::Ready(Some(Err(status)));
142+
}
143+
137144
loop {
138145
match source.as_mut().poll_next(cx) {
139146
Poll::Pending if buf.is_empty() => {
@@ -163,7 +170,11 @@ where
163170
}
164171
}
165172
Poll::Ready(Some(Err(status))) => {
166-
return Poll::Ready(Some(Err(status)));
173+
if buf.is_empty() {
174+
return Poll::Ready(Some(Err(status)));
175+
}
176+
*error = Some(status);
177+
return Poll::Ready(Some(Ok(buf.split_to(buf.len()).freeze())));
167178
}
168179
}
169180
}

0 commit comments

Comments
 (0)
Please sign in to comment.