Skip to content

Commit 18a2b30

Browse files
authoredFeb 20, 2024··
feat(build): Custom codecs for generated code (#1599)
* feat(tonic): Custom codecs for generated code Broadly, this change does 2 things: 1. Allow the built-in Prost codec to have its buffer sizes customized 2. Allow users to specify custom codecs on the tonic_build::prost::Builder The Prost codec is convenient, and handles any normal use case. However, the buffer sizes today are too large in some cases - and they may grow too aggressively. By exposing BufferSettings, users can make a small custom codec with their own BufferSettings to control their memory usage - or give enormous buffers to rpc's, as their use case requires. While one can define a custom service and methods with a custom codec today explicitly in Rust, the code generator does not have a means to supply a custom codec. I've reached for .codec... on the tonic_build::prost::Builder many times and keep forgetting it's not there. This change adds .codec_path to the Builder, so people can simply add their custom buffer codec or even their own full top level codec without reaching for manual service definition. * replace threadlocal with service wrapper * pull back ProstEn/Decoder, clean up other comments * clippy and fmt * feedback, clean up straggler changes
1 parent 408f46d commit 18a2b30

File tree

13 files changed

+458
-53
lines changed

13 files changed

+458
-53
lines changed
 

‎examples/Cargo.toml

+8
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,14 @@ required-features = ["cancellation"]
276276
name = "cancellation-client"
277277
path = "src/cancellation/client.rs"
278278

279+
[[bin]]
280+
name = "codec-buffers-server"
281+
path = "src/codec_buffers/server.rs"
282+
283+
[[bin]]
284+
name = "codec-buffers-client"
285+
path = "src/codec_buffers/client.rs"
286+
279287

280288
[features]
281289
gcp = ["dep:prost-types", "tonic/tls"]

‎examples/build.rs

+8
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ fn main() {
3333
.unwrap();
3434

3535
build_json_codec_service();
36+
37+
let smallbuff_copy = out_dir.join("smallbuf");
38+
let _ = std::fs::create_dir(smallbuff_copy.clone()); // This will panic below if the directory failed to create
39+
tonic_build::configure()
40+
.out_dir(smallbuff_copy)
41+
.codec_path("crate::common::SmallBufferCodec")
42+
.compile(&["proto/helloworld/helloworld.proto"], &["proto"])
43+
.unwrap();
3644
}
3745

3846
// Manually define the json.helloworld.Greeter service which used a custom JsonCodec to use json

‎examples/src/codec_buffers/client.rs

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
//! A HelloWorld example that uses a custom codec instead of the default Prost codec.
2+
//!
3+
//! Generated code is the output of codegen as defined in the `examples/build.rs` file.
4+
//! The generation is the one with .codec_path("crate::common::SmallBufferCodec")
5+
//! The generated code assumes that a module `crate::common` exists which defines
6+
//! `SmallBufferCodec`, and `SmallBufferCodec` must have a Default implementation.
7+
8+
pub mod common;
9+
10+
pub mod small_buf {
11+
include!(concat!(env!("OUT_DIR"), "/smallbuf/helloworld.rs"));
12+
}
13+
use small_buf::greeter_client::GreeterClient;
14+
15+
use crate::small_buf::HelloRequest;
16+
17+
#[tokio::main]
18+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
19+
let mut client = GreeterClient::connect("http://[::1]:50051").await?;
20+
21+
let request = tonic::Request::new(HelloRequest {
22+
name: "Tonic".into(),
23+
});
24+
25+
let response = client.say_hello(request).await?;
26+
27+
println!("RESPONSE={:?}", response);
28+
29+
Ok(())
30+
}

‎examples/src/codec_buffers/common.rs

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
//! This module defines a common encoder with small buffers. This is useful
2+
//! when you have many concurrent RPC's, and not a huge volume of data per
3+
//! rpc normally.
4+
//!
5+
//! Note that you can customize your codecs per call to the code generator's
6+
//! compile function. This lets you group services by their codec needs.
7+
//!
8+
//! While this codec demonstrates customizing the built-in Prost codec, you
9+
//! can use this to implement other codecs as well, as long as they have a
10+
//! Default implementation.
11+
12+
use std::marker::PhantomData;
13+
14+
use prost::Message;
15+
use tonic::codec::{BufferSettings, Codec, ProstCodec};
16+
17+
#[derive(Debug, Clone, Copy, Default)]
18+
pub struct SmallBufferCodec<T, U>(PhantomData<(T, U)>);
19+
20+
impl<T, U> Codec for SmallBufferCodec<T, U>
21+
where
22+
T: Message + Send + 'static,
23+
U: Message + Default + Send + 'static,
24+
{
25+
type Encode = T;
26+
type Decode = U;
27+
28+
type Encoder = <ProstCodec<T, U> as Codec>::Encoder;
29+
type Decoder = <ProstCodec<T, U> as Codec>::Decoder;
30+
31+
fn encoder(&mut self) -> Self::Encoder {
32+
// Here, we will just customize the prost codec's internal buffer settings.
33+
// You can of course implement a complete Codec, Encoder, and Decoder if
34+
// you wish!
35+
ProstCodec::<T, U>::raw_encoder(BufferSettings::new(512, 4096))
36+
}
37+
38+
fn decoder(&mut self) -> Self::Decoder {
39+
ProstCodec::<T, U>::raw_decoder(BufferSettings::new(512, 4096))
40+
}
41+
}

‎examples/src/codec_buffers/server.rs

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
//! A HelloWorld example that uses a custom codec instead of the default Prost codec.
2+
//!
3+
//! Generated code is the output of codegen as defined in the `examples/build.rs` file.
4+
//! The generation is the one with .codec_path("crate::common::SmallBufferCodec")
5+
//! The generated code assumes that a module `crate::common` exists which defines
6+
//! `SmallBufferCodec`, and `SmallBufferCodec` must have a Default implementation.
7+
8+
use tonic::{transport::Server, Request, Response, Status};
9+
10+
pub mod common;
11+
12+
pub mod small_buf {
13+
include!(concat!(env!("OUT_DIR"), "/smallbuf/helloworld.rs"));
14+
}
15+
use small_buf::{
16+
greeter_server::{Greeter, GreeterServer},
17+
HelloReply, HelloRequest,
18+
};
19+
20+
#[derive(Default)]
21+
pub struct MyGreeter {}
22+
23+
#[tonic::async_trait]
24+
impl Greeter for MyGreeter {
25+
async fn say_hello(
26+
&self,
27+
request: Request<HelloRequest>,
28+
) -> Result<Response<HelloReply>, Status> {
29+
println!("Got a request from {:?}", request.remote_addr());
30+
31+
let reply = HelloReply {
32+
message: format!("Hello {}!", request.into_inner().name),
33+
};
34+
Ok(Response::new(reply))
35+
}
36+
}
37+
38+
#[tokio::main]
39+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
40+
let addr = "[::1]:50051".parse().unwrap();
41+
let greeter = MyGreeter::default();
42+
43+
println!("GreeterServer listening on {}", addr);
44+
45+
Server::builder()
46+
.add_service(GreeterServer::new(greeter))
47+
.serve(addr)
48+
.await?;
49+
50+
Ok(())
51+
}

‎tonic-build/src/compile_settings.rs

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#[derive(Debug, Clone)]
2+
pub(crate) struct CompileSettings {
3+
#[cfg(feature = "prost")]
4+
pub(crate) codec_path: String,
5+
}
6+
7+
impl Default for CompileSettings {
8+
fn default() -> Self {
9+
Self {
10+
#[cfg(feature = "prost")]
11+
codec_path: "tonic::codec::ProstCodec".to_string(),
12+
}
13+
}
14+
}

‎tonic-build/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ pub mod server;
9797
mod code_gen;
9898
pub use code_gen::CodeGenBuilder;
9999

100+
mod compile_settings;
101+
100102
/// Service generation trait.
101103
///
102104
/// This trait can be implemented and consumed

‎tonic-build/src/prost.rs

+82-21
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::code_gen::CodeGenBuilder;
1+
use crate::{code_gen::CodeGenBuilder, compile_settings::CompileSettings};
22

33
use super::Attributes;
44
use proc_macro2::TokenStream;
@@ -41,6 +41,7 @@ pub fn configure() -> Builder {
4141
disable_comments: HashSet::default(),
4242
use_arc_self: false,
4343
generate_default_stubs: false,
44+
compile_settings: CompileSettings::default(),
4445
}
4546
}
4647

@@ -61,61 +62,98 @@ pub fn compile_protos(proto: impl AsRef<Path>) -> io::Result<()> {
6162
Ok(())
6263
}
6364

64-
const PROST_CODEC_PATH: &str = "tonic::codec::ProstCodec";
65-
6665
/// Non-path Rust types allowed for request/response types.
6766
const NON_PATH_TYPE_ALLOWLIST: &[&str] = &["()"];
6867

69-
impl crate::Service for Service {
70-
type Method = Method;
68+
/// Newtype wrapper for prost to add tonic-specific extensions
69+
struct TonicBuildService {
70+
prost_service: Service,
71+
methods: Vec<TonicBuildMethod>,
72+
}
73+
74+
impl TonicBuildService {
75+
fn new(prost_service: Service, settings: CompileSettings) -> Self {
76+
Self {
77+
// CompileSettings are currently only consumed method-by-method but if you need them in the Service, here's your spot.
78+
// The tonic_build::Service trait specifies that methods are borrowed, so they have to reified up front.
79+
methods: prost_service
80+
.methods
81+
.iter()
82+
.map(|prost_method| TonicBuildMethod {
83+
prost_method: prost_method.clone(),
84+
settings: settings.clone(),
85+
})
86+
.collect(),
87+
prost_service,
88+
}
89+
}
90+
}
91+
92+
/// Newtype wrapper for prost to add tonic-specific extensions
93+
struct TonicBuildMethod {
94+
prost_method: Method,
95+
settings: CompileSettings,
96+
}
97+
98+
impl crate::Service for TonicBuildService {
99+
type Method = TonicBuildMethod;
71100
type Comment = String;
72101

73102
fn name(&self) -> &str {
74-
&self.name
103+
&self.prost_service.name
75104
}
76105

77106
fn package(&self) -> &str {
78-
&self.package
107+
&self.prost_service.package
79108
}
80109

81110
fn identifier(&self) -> &str {
82-
&self.proto_name
111+
&self.prost_service.proto_name
83112
}
84113

85114
fn comment(&self) -> &[Self::Comment] {
86-
&self.comments.leading[..]
115+
&self.prost_service.comments.leading[..]
87116
}
88117

89118
fn methods(&self) -> &[Self::Method] {
90-
&self.methods[..]
119+
&self.methods
91120
}
92121
}
93122

94-
impl crate::Method for Method {
123+
impl crate::Method for TonicBuildMethod {
95124
type Comment = String;
96125

97126
fn name(&self) -> &str {
98-
&self.name
127+
&self.prost_method.name
99128
}
100129

101130
fn identifier(&self) -> &str {
102-
&self.proto_name
131+
&self.prost_method.proto_name
103132
}
104133

134+
/// For code generation, you can override the codec.
135+
///
136+
/// You should set the codec path to an import path that has a free
137+
/// function like `fn default()`. The default value is tonic::codec::ProstCodec,
138+
/// which returns a default-configured ProstCodec. You may wish to configure
139+
/// the codec, e.g., with a buffer configuration.
140+
///
141+
/// Though ProstCodec implements Default, it is currently only required that
142+
/// the function match the Default trait's function spec.
105143
fn codec_path(&self) -> &str {
106-
PROST_CODEC_PATH
144+
&self.settings.codec_path
107145
}
108146

109147
fn client_streaming(&self) -> bool {
110-
self.client_streaming
148+
self.prost_method.client_streaming
111149
}
112150

113151
fn server_streaming(&self) -> bool {
114-
self.server_streaming
152+
self.prost_method.server_streaming
115153
}
116154

117155
fn comment(&self) -> &[Self::Comment] {
118-
&self.comments.leading[..]
156+
&self.prost_method.comments.leading[..]
119157
}
120158

121159
fn request_response_name(
@@ -140,8 +178,14 @@ impl crate::Method for Method {
140178
}
141179
};
142180

143-
let request = convert_type(&self.input_proto_type, &self.input_type);
144-
let response = convert_type(&self.output_proto_type, &self.output_type);
181+
let request = convert_type(
182+
&self.prost_method.input_proto_type,
183+
&self.prost_method.input_type,
184+
);
185+
let response = convert_type(
186+
&self.prost_method.output_proto_type,
187+
&self.prost_method.output_type,
188+
);
145189
(request, response)
146190
}
147191
}
@@ -176,7 +220,10 @@ impl prost_build::ServiceGenerator for ServiceGenerator {
176220
.disable_comments(self.builder.disable_comments.clone())
177221
.use_arc_self(self.builder.use_arc_self)
178222
.generate_default_stubs(self.builder.generate_default_stubs)
179-
.generate_server(&service, &self.builder.proto_path);
223+
.generate_server(
224+
&TonicBuildService::new(service.clone(), self.builder.compile_settings.clone()),
225+
&self.builder.proto_path,
226+
);
180227

181228
self.servers.extend(server);
182229
}
@@ -188,7 +235,10 @@ impl prost_build::ServiceGenerator for ServiceGenerator {
188235
.attributes(self.builder.client_attributes.clone())
189236
.disable_comments(self.builder.disable_comments.clone())
190237
.build_transport(self.builder.build_transport)
191-
.generate_client(&service, &self.builder.proto_path);
238+
.generate_client(
239+
&TonicBuildService::new(service, self.builder.compile_settings.clone()),
240+
&self.builder.proto_path,
241+
);
192242

193243
self.clients.extend(client);
194244
}
@@ -252,6 +302,7 @@ pub struct Builder {
252302
pub(crate) disable_comments: HashSet<String>,
253303
pub(crate) use_arc_self: bool,
254304
pub(crate) generate_default_stubs: bool,
305+
pub(crate) compile_settings: CompileSettings,
255306

256307
out_dir: Option<PathBuf>,
257308
}
@@ -524,6 +575,16 @@ impl Builder {
524575
self
525576
}
526577

578+
/// Override the default codec.
579+
///
580+
/// If set, writes `{codec_path}::default()` in generated code wherever a codec is created.
581+
///
582+
/// This defaults to `"tonic::codec::ProstCodec"`
583+
pub fn codec_path(mut self, codec_path: impl Into<String>) -> Self {
584+
self.compile_settings.codec_path = codec_path.into();
585+
self
586+
}
587+
527588
/// Compile the .proto files and execute code generation.
528589
pub fn compile(
529590
self,

‎tonic/src/codec/compression.rs

+18-7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use super::encode::BUFFER_SIZE;
21
use crate::{metadata::MetadataValue, Status};
32
use bytes::{Buf, BytesMut};
43
#[cfg(feature = "gzip")]
@@ -70,6 +69,14 @@ impl EnabledCompressionEncodings {
7069
}
7170
}
7271

72+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
73+
pub(crate) struct CompressionSettings {
74+
pub(crate) encoding: CompressionEncoding,
75+
/// buffer_growth_interval controls memory growth for internal buffers to balance resizing cost against memory waste.
76+
/// The default buffer growth interval is 8 kilobytes.
77+
pub(crate) buffer_growth_interval: usize,
78+
}
79+
7380
/// The compression encodings Tonic supports.
7481
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
7582
#[non_exhaustive]
@@ -195,20 +202,22 @@ fn split_by_comma(s: &str) -> impl Iterator<Item = &str> {
195202
}
196203

197204
/// Compress `len` bytes from `decompressed_buf` into `out_buf`.
205+
/// buffer_size_increment is a hint to control the growth of out_buf versus the cost of resizing it.
198206
#[allow(unused_variables, unreachable_code)]
199207
pub(crate) fn compress(
200-
encoding: CompressionEncoding,
208+
settings: CompressionSettings,
201209
decompressed_buf: &mut BytesMut,
202210
out_buf: &mut BytesMut,
203211
len: usize,
204212
) -> Result<(), std::io::Error> {
205-
let capacity = ((len / BUFFER_SIZE) + 1) * BUFFER_SIZE;
213+
let buffer_growth_interval = settings.buffer_growth_interval;
214+
let capacity = ((len / buffer_growth_interval) + 1) * buffer_growth_interval;
206215
out_buf.reserve(capacity);
207216

208217
#[cfg(any(feature = "gzip", feature = "zstd"))]
209218
let mut out_writer = bytes::BufMut::writer(out_buf);
210219

211-
match encoding {
220+
match settings.encoding {
212221
#[cfg(feature = "gzip")]
213222
CompressionEncoding::Gzip => {
214223
let mut gzip_encoder = GzEncoder::new(
@@ -237,19 +246,21 @@ pub(crate) fn compress(
237246
/// Decompress `len` bytes from `compressed_buf` into `out_buf`.
238247
#[allow(unused_variables, unreachable_code)]
239248
pub(crate) fn decompress(
240-
encoding: CompressionEncoding,
249+
settings: CompressionSettings,
241250
compressed_buf: &mut BytesMut,
242251
out_buf: &mut BytesMut,
243252
len: usize,
244253
) -> Result<(), std::io::Error> {
254+
let buffer_growth_interval = settings.buffer_growth_interval;
245255
let estimate_decompressed_len = len * 2;
246-
let capacity = ((estimate_decompressed_len / BUFFER_SIZE) + 1) * BUFFER_SIZE;
256+
let capacity =
257+
((estimate_decompressed_len / buffer_growth_interval) + 1) * buffer_growth_interval;
247258
out_buf.reserve(capacity);
248259

249260
#[cfg(any(feature = "gzip", feature = "zstd"))]
250261
let mut out_writer = bytes::BufMut::writer(out_buf);
251262

252-
match encoding {
263+
match settings.encoding {
253264
#[cfg(feature = "gzip")]
254265
CompressionEncoding::Gzip => {
255266
let mut gzip_decoder = GzDecoder::new(&compressed_buf[0..len]);

‎tonic/src/codec/decode.rs

+18-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use super::compression::{decompress, CompressionEncoding};
2-
use super::{DecodeBuf, Decoder, DEFAULT_MAX_RECV_MESSAGE_SIZE, HEADER_SIZE};
1+
use super::compression::{decompress, CompressionEncoding, CompressionSettings};
2+
use super::{BufferSettings, DecodeBuf, Decoder, DEFAULT_MAX_RECV_MESSAGE_SIZE, HEADER_SIZE};
33
use crate::{body::BoxBody, metadata::MetadataMap, Code, Status};
44
use bytes::{Buf, BufMut, BytesMut};
55
use http::StatusCode;
@@ -13,8 +13,6 @@ use std::{
1313
use tokio_stream::Stream;
1414
use tracing::{debug, trace};
1515

16-
const BUFFER_SIZE: usize = 8 * 1024;
17-
1816
/// Streaming requests and responses.
1917
///
2018
/// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface
@@ -118,6 +116,7 @@ impl<T> Streaming<T> {
118116
B::Error: Into<crate::Error>,
119117
D: Decoder<Item = T, Error = Status> + Send + 'static,
120118
{
119+
let buffer_size = decoder.buffer_settings().buffer_size;
121120
Self {
122121
decoder: Box::new(decoder),
123122
inner: StreamingInner {
@@ -127,7 +126,7 @@ impl<T> Streaming<T> {
127126
.boxed_unsync(),
128127
state: State::ReadHeader,
129128
direction,
130-
buf: BytesMut::with_capacity(BUFFER_SIZE),
129+
buf: BytesMut::with_capacity(buffer_size),
131130
trailers: None,
132131
decompress_buf: BytesMut::new(),
133132
encoding,
@@ -138,7 +137,10 @@ impl<T> Streaming<T> {
138137
}
139138

140139
impl StreamingInner {
141-
fn decode_chunk(&mut self) -> Result<Option<DecodeBuf<'_>>, Status> {
140+
fn decode_chunk(
141+
&mut self,
142+
buffer_settings: BufferSettings,
143+
) -> Result<Option<DecodeBuf<'_>>, Status> {
142144
if let State::ReadHeader = self.state {
143145
if self.buf.remaining() < HEADER_SIZE {
144146
return Ok(None);
@@ -205,8 +207,15 @@ impl StreamingInner {
205207
let decode_buf = if let Some(encoding) = compression {
206208
self.decompress_buf.clear();
207209

208-
if let Err(err) = decompress(encoding, &mut self.buf, &mut self.decompress_buf, len)
209-
{
210+
if let Err(err) = decompress(
211+
CompressionSettings {
212+
encoding,
213+
buffer_growth_interval: buffer_settings.buffer_size,
214+
},
215+
&mut self.buf,
216+
&mut self.decompress_buf,
217+
len,
218+
) {
210219
let message = if let Direction::Response(status) = self.direction {
211220
format!(
212221
"Error decompressing: {}, while receiving response with status: {}",
@@ -364,7 +373,7 @@ impl<T> Streaming<T> {
364373
}
365374

366375
fn decode_chunk(&mut self) -> Result<Option<T>, Status> {
367-
match self.inner.decode_chunk()? {
376+
match self.inner.decode_chunk(self.decoder.buffer_settings())? {
368377
Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? {
369378
Some(msg) => {
370379
self.inner.state = State::ReadHeader;

‎tonic/src/codec/encode.rs

+21-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
use super::compression::{compress, CompressionEncoding, SingleMessageCompressionOverride};
2-
use super::{EncodeBuf, Encoder, DEFAULT_MAX_SEND_MESSAGE_SIZE, HEADER_SIZE};
1+
use super::compression::{
2+
compress, CompressionEncoding, CompressionSettings, SingleMessageCompressionOverride,
3+
};
4+
use super::{BufferSettings, EncodeBuf, Encoder, DEFAULT_MAX_SEND_MESSAGE_SIZE, HEADER_SIZE};
35
use crate::{Code, Status};
46
use bytes::{BufMut, Bytes, BytesMut};
57
use http::HeaderMap;
@@ -11,9 +13,6 @@ use std::{
1113
};
1214
use tokio_stream::{Stream, StreamExt};
1315

14-
pub(super) const BUFFER_SIZE: usize = 8 * 1024;
15-
const YIELD_THRESHOLD: usize = 32 * 1024;
16-
1716
pub(crate) fn encode_server<T, U>(
1817
encoder: T,
1918
source: U,
@@ -90,7 +89,8 @@ where
9089
compression_override: SingleMessageCompressionOverride,
9190
max_message_size: Option<usize>,
9291
) -> Self {
93-
let buf = BytesMut::with_capacity(BUFFER_SIZE);
92+
let buffer_settings = encoder.buffer_settings();
93+
let buf = BytesMut::with_capacity(buffer_settings.buffer_size);
9494

9595
let compression_encoding =
9696
if compression_override == SingleMessageCompressionOverride::Disable {
@@ -100,7 +100,7 @@ where
100100
};
101101

102102
let uncompression_buf = if compression_encoding.is_some() {
103-
BytesMut::with_capacity(BUFFER_SIZE)
103+
BytesMut::with_capacity(buffer_settings.buffer_size)
104104
} else {
105105
BytesMut::new()
106106
};
@@ -132,6 +132,7 @@ where
132132
buf,
133133
uncompression_buf,
134134
} = self.project();
135+
let buffer_settings = encoder.buffer_settings();
135136

136137
loop {
137138
match source.as_mut().poll_next(cx) {
@@ -151,12 +152,13 @@ where
151152
uncompression_buf,
152153
*compression_encoding,
153154
*max_message_size,
155+
buffer_settings,
154156
item,
155157
) {
156158
return Poll::Ready(Some(Err(status)));
157159
}
158160

159-
if buf.len() >= YIELD_THRESHOLD {
161+
if buf.len() >= buffer_settings.yield_threshold {
160162
return Poll::Ready(Some(Ok(buf.split_to(buf.len()).freeze())));
161163
}
162164
}
@@ -174,6 +176,7 @@ fn encode_item<T>(
174176
uncompression_buf: &mut BytesMut,
175177
compression_encoding: Option<CompressionEncoding>,
176178
max_message_size: Option<usize>,
179+
buffer_settings: BufferSettings,
177180
item: T::Item,
178181
) -> Result<(), Status>
179182
where
@@ -195,8 +198,16 @@ where
195198

196199
let uncompressed_len = uncompression_buf.len();
197200

198-
compress(encoding, uncompression_buf, buf, uncompressed_len)
199-
.map_err(|err| Status::internal(format!("Error compressing: {}", err)))?;
201+
compress(
202+
CompressionSettings {
203+
encoding,
204+
buffer_growth_interval: buffer_settings.buffer_size,
205+
},
206+
uncompression_buf,
207+
buf,
208+
uncompressed_len,
209+
)
210+
.map_err(|err| Status::internal(format!("Error compressing: {}", err)))?;
200211
} else {
201212
encoder
202213
.encode(item, &mut EncodeBuf::new(buf))

‎tonic/src/codec/mod.rs

+79
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,75 @@ pub use self::decode::Streaming;
2222
#[cfg_attr(docsrs, doc(cfg(feature = "prost")))]
2323
pub use self::prost::ProstCodec;
2424

25+
/// Unless overridden, this is the buffer size used for encoding requests.
26+
/// This is spent per-rpc, so you may wish to adjust it. The default is
27+
/// pretty good for most uses, but if you have a ton of concurrent rpcs
28+
/// you may find it too expensive.
29+
const DEFAULT_CODEC_BUFFER_SIZE: usize = 8 * 1024;
30+
const DEFAULT_YIELD_THRESHOLD: usize = 32 * 1024;
31+
32+
/// Settings for how tonic allocates and grows buffers.
33+
///
34+
/// Tonic eagerly allocates the buffer_size per RPC, and grows
35+
/// the buffer by buffer_size increments to handle larger messages.
36+
/// Buffer size defaults to 8KiB.
37+
///
38+
/// Example:
39+
/// ```ignore
40+
/// Buffer start: | 8kb |
41+
/// Message received: | 24612 bytes |
42+
/// Buffer grows: | 8kb | 8kb | 8kb | 8kb |
43+
/// ```
44+
///
45+
/// The buffer grows to the next largest buffer_size increment of
46+
/// 32768 to hold 24612 bytes, which is just slightly too large for
47+
/// the previous buffer increment of 24576.
48+
///
49+
/// If you use a smaller buffer size you will waste less memory, but
50+
/// you will allocate more frequently. If one way or the other matters
51+
/// more to you, you may wish to customize your tonic Codec (see
52+
/// codec_buffers example).
53+
///
54+
/// Yield threshold is an optimization for streaming rpcs. Sometimes
55+
/// you may have many small messages ready to send. When they are ready,
56+
/// it is a much more efficient use of system resources to batch them
57+
/// together into one larger send(). The yield threshold controls how
58+
/// much you want to bulk up such a batch of ready-to-send messages.
59+
/// The larger your yield threshold the more you will batch - and
60+
/// consequentially allocate contiguous memory, which might be relevant
61+
/// if you're considering large numbers here.
62+
/// If your server streaming rpc does not reach the yield threshold
63+
/// before it reaches Poll::Pending (meaning, it's waiting for more
64+
/// data from wherever you're streaming from) then Tonic will just send
65+
/// along a smaller batch. Yield threshold is an upper-bound, it will
66+
/// not affect the responsiveness of your streaming rpc (for reasonable
67+
/// sizes of yield threshold).
68+
/// Yield threshold defaults to 32 KiB.
69+
#[derive(Clone, Copy, Debug)]
70+
pub struct BufferSettings {
71+
buffer_size: usize,
72+
yield_threshold: usize,
73+
}
74+
75+
impl BufferSettings {
76+
/// Create a new `BufferSettings`
77+
pub fn new(buffer_size: usize, yield_threshold: usize) -> Self {
78+
Self {
79+
buffer_size,
80+
yield_threshold,
81+
}
82+
}
83+
}
84+
85+
impl Default for BufferSettings {
86+
fn default() -> Self {
87+
Self {
88+
buffer_size: DEFAULT_CODEC_BUFFER_SIZE,
89+
yield_threshold: DEFAULT_YIELD_THRESHOLD,
90+
}
91+
}
92+
}
93+
2594
// 5 bytes
2695
const HEADER_SIZE: usize =
2796
// compression flag
@@ -63,6 +132,11 @@ pub trait Encoder {
63132

64133
/// Encodes a message into the provided buffer.
65134
fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error>;
135+
136+
/// Controls how tonic creates and expands encode buffers.
137+
fn buffer_settings(&self) -> BufferSettings {
138+
BufferSettings::default()
139+
}
66140
}
67141

68142
/// Decodes gRPC message types
@@ -79,4 +153,9 @@ pub trait Decoder {
79153
/// is no need to get the length from the bytes, gRPC framing is handled
80154
/// for you.
81155
fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error>;
156+
157+
/// Controls how tonic creates and expands decode buffers.
158+
fn buffer_settings(&self) -> BufferSettings {
159+
BufferSettings::default()
160+
}
82161
}

‎tonic/src/codec/prost.rs

+86-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{Codec, DecodeBuf, Decoder, Encoder};
1+
use super::{BufferSettings, Codec, DecodeBuf, Decoder, Encoder};
22
use crate::codec::EncodeBuf;
33
use crate::{Code, Status};
44
use prost::Message;
@@ -10,9 +10,41 @@ pub struct ProstCodec<T, U> {
1010
_pd: PhantomData<(T, U)>,
1111
}
1212

13+
impl<T, U> ProstCodec<T, U> {
14+
/// Configure a ProstCodec with encoder/decoder buffer settings. This is used to control
15+
/// how memory is allocated and grows per RPC.
16+
pub fn new() -> Self {
17+
Self { _pd: PhantomData }
18+
}
19+
}
20+
1321
impl<T, U> Default for ProstCodec<T, U> {
1422
fn default() -> Self {
15-
Self { _pd: PhantomData }
23+
Self::new()
24+
}
25+
}
26+
27+
impl<T, U> ProstCodec<T, U>
28+
where
29+
T: Message + Send + 'static,
30+
U: Message + Default + Send + 'static,
31+
{
32+
/// A tool for building custom codecs based on prost encoding and decoding.
33+
/// See the codec_buffers example for one possible way to use this.
34+
pub fn raw_encoder(buffer_settings: BufferSettings) -> <Self as Codec>::Encoder {
35+
ProstEncoder {
36+
_pd: PhantomData,
37+
buffer_settings,
38+
}
39+
}
40+
41+
/// A tool for building custom codecs based on prost encoding and decoding.
42+
/// See the codec_buffers example for one possible way to use this.
43+
pub fn raw_decoder(buffer_settings: BufferSettings) -> <Self as Codec>::Decoder {
44+
ProstDecoder {
45+
_pd: PhantomData,
46+
buffer_settings,
47+
}
1648
}
1749
}
1850

@@ -28,17 +60,36 @@ where
2860
type Decoder = ProstDecoder<U>;
2961

3062
fn encoder(&mut self) -> Self::Encoder {
31-
ProstEncoder(PhantomData)
63+
ProstEncoder {
64+
_pd: PhantomData,
65+
buffer_settings: BufferSettings::default(),
66+
}
3267
}
3368

3469
fn decoder(&mut self) -> Self::Decoder {
35-
ProstDecoder(PhantomData)
70+
ProstDecoder {
71+
_pd: PhantomData,
72+
buffer_settings: BufferSettings::default(),
73+
}
3674
}
3775
}
3876

3977
/// A [`Encoder`] that knows how to encode `T`.
4078
#[derive(Debug, Clone, Default)]
41-
pub struct ProstEncoder<T>(PhantomData<T>);
79+
pub struct ProstEncoder<T> {
80+
_pd: PhantomData<T>,
81+
buffer_settings: BufferSettings,
82+
}
83+
84+
impl<T> ProstEncoder<T> {
85+
/// Get a new encoder with explicit buffer settings
86+
pub fn new(buffer_settings: BufferSettings) -> Self {
87+
Self {
88+
_pd: PhantomData,
89+
buffer_settings,
90+
}
91+
}
92+
}
4293

4394
impl<T: Message> Encoder for ProstEncoder<T> {
4495
type Item = T;
@@ -50,11 +101,28 @@ impl<T: Message> Encoder for ProstEncoder<T> {
50101

51102
Ok(())
52103
}
104+
105+
fn buffer_settings(&self) -> BufferSettings {
106+
self.buffer_settings
107+
}
53108
}
54109

55110
/// A [`Decoder`] that knows how to decode `U`.
56111
#[derive(Debug, Clone, Default)]
57-
pub struct ProstDecoder<U>(PhantomData<U>);
112+
pub struct ProstDecoder<U> {
113+
_pd: PhantomData<U>,
114+
buffer_settings: BufferSettings,
115+
}
116+
117+
impl<U> ProstDecoder<U> {
118+
/// Get a new decoder with explicit buffer settings
119+
pub fn new(buffer_settings: BufferSettings) -> Self {
120+
Self {
121+
_pd: PhantomData,
122+
buffer_settings,
123+
}
124+
}
125+
}
58126

59127
impl<U: Message + Default> Decoder for ProstDecoder<U> {
60128
type Item = U;
@@ -67,6 +135,10 @@ impl<U: Message + Default> Decoder for ProstDecoder<U> {
67135

68136
Ok(item)
69137
}
138+
139+
fn buffer_settings(&self) -> BufferSettings {
140+
self.buffer_settings
141+
}
70142
}
71143

72144
fn from_decode_error(error: prost::DecodeError) -> crate::Status {
@@ -244,6 +316,10 @@ mod tests {
244316
buf.put(&item[..]);
245317
Ok(())
246318
}
319+
320+
fn buffer_settings(&self) -> crate::codec::BufferSettings {
321+
Default::default()
322+
}
247323
}
248324

249325
#[derive(Debug, Clone, Default)]
@@ -258,6 +334,10 @@ mod tests {
258334
buf.advance(LEN);
259335
Ok(Some(out))
260336
}
337+
338+
fn buffer_settings(&self) -> crate::codec::BufferSettings {
339+
Default::default()
340+
}
261341
}
262342

263343
mod body {

0 commit comments

Comments
 (0)
Please sign in to comment.