Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: hyperium/tonic
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.8.0
Choose a base ref
...
head repository: hyperium/tonic
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v0.8.1
Choose a head ref
  • 9 commits
  • 19 files changed
  • 9 contributors

Commits on Aug 2, 2022

  1. chore: Update docs to reflect changes to compression feature (#1046)

    Co-authored-by: Lucio Franco <luciofranco14@gmail.com>
    Nick Ashley and LucioFranco authored Aug 2, 2022

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    9be4e24 View commit details

Commits on Aug 9, 2022

  1. Verified

    This commit was signed with the committer’s verified signature.
    darinpope Darin Pope
    Copy the full SHA
    fb84058 View commit details
  2. Copy the full SHA
    433c4eb View commit details

Commits on Aug 15, 2022

  1. Update helloworld-tutorial.md (#1057)

    grpcurl command have been run successfully with modifieded like above.
    bathtimefish authored Aug 15, 2022
    Copy the full SHA
    2d66a0a View commit details

Commits on Aug 23, 2022

  1. tonic-health: commit generated code (#1065)

    * tonic-health: commit generated code rather than generating at build time
    
    This saves users from needing protoc available on their path if they're not
    generating other tonic code at build time, either.
    
    The generated modules can be regenerated by enabling the `gen-proto`
    feature, which will trigger the relevant part of the build script.
    
    * Check generated code for tonic-health matches in CI
    
    * Use bootstrap test to generate/check validity of generated code
    
    As suggested by @LucioFranco in https://github.com/hyperium/tonic/pull/1065\#discussion_r951580189.
    This avoids the need for a feature which would show
    up in docs.rs, and achieves the same goals via CI.
    sd2k authored Aug 23, 2022
    Copy the full SHA
    0a2a2f3 View commit details
  2. Copy the full SHA
    523a550 View commit details

Commits on Aug 24, 2022

  1. Copy the full SHA
    2a83f8b View commit details

Commits on Sep 6, 2022

  1. feat(transport): Expose hyper's H2 adaptive window on server (#1071)

    Co-authored-by: Adam Chalmers <adamschalmers@gmail.com>
    adamchalmers and Adam Chalmers authored Sep 6, 2022
    Copy the full SHA
    919d28b View commit details

Commits on Sep 7, 2022

  1. Copy the full SHA
    054776f View commit details
15 changes: 13 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
# [v0.8.0](https://github.com/hyperium/tonic/compare/v0.7.2...v0.8.0) (2022-07-29)
# [0.8.1](https://github.com/hyperium/tonic/compare/v0.8.0...v0.8.1) (2022-09-07)


### Features

* **transport:** Expose hyper's H2 adaptive window on server ([#1071](https://github.com/hyperium/tonic/issues/1071)) ([919d28b](https://github.com/hyperium/tonic/commit/919d28b2b96c7c803cec131a9e36e80d2b071701))
* Reduce the amount of monomorphized code.
* Expose `Extensions::into_http` and `Status::from_error`.
* **health:** Remove `build.rs` and commit generated code.

# [0.8.0](https://github.com/hyperium/tonic/compare/v0.7.2...v0.8.0) (2022-07-29)


### Features
@@ -14,8 +24,9 @@

* **build:** `CODEC_PATH` moved from const to fn
* **tonic** Remove codegen depedency on `compression` feature.
* **tonic** Remove `compression` feature in favor of `gzip` feature.

# [v0.7.2](https://github.com/hyperium/tonic/compare/v0.7.1...v0.7.2) (2022-05-04)
# [0.7.2](https://github.com/hyperium/tonic/compare/v0.7.1...v0.7.2) (2022-05-04)


### Bug Fixes
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -52,6 +52,31 @@ $ rustup update
$ cargo build
```

### Dependencies

In order to build `tonic` >= 0.8.0, you need the `protoc` Protocol Buffers compiler, along with Protocol Buffers resource files.

#### Ubuntu

```bash
sudo apt update && sudo apt upgrade -y
sudo apt install -y protobuf-compiler libprotobuf-dev
```

#### Alpine Linux

```sh
sudo apk add protoc protobuf-dev
```

#### macOS

Assuming [Homebrew](https://brew.sh/) is already installed. (If not, see instructions for installing Homebrew on [the Homebrew website](https://brew.sh/).)

```zsh
brew install protobuf
```

### Tutorials

- The [`helloworld`][helloworld-tutorial] tutorial provides a basic example of using `tonic`, perfect for first time users!
24 changes: 24 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -2,6 +2,30 @@

Set of examples that show off the features provided by `tonic`.

In order to build these examples, you must have the `protoc` Protocol Buffers compiler
installed, along with the Protocol Buffers resource files.

Ubuntu:

```bash
sudo apt update && sudo apt upgrade -y
sudo apt install -y protobuf-compiler libprotobuf-dev
```

Alpine Linux:

```sh
sudo apk add protoc protobuf-dev
```

macOS:

Assuming [Homebrew](https://brew.sh/) is already installed. (If not, see instructions for installing Homebrew on [the Homebrew website](https://brew.sh/).)

```zsh
brew install protobuf
```

## Helloworld

### Client
2 changes: 1 addition & 1 deletion examples/helloworld-tutorial.md
Original file line number Diff line number Diff line change
@@ -243,7 +243,7 @@ If you have a gRPC GUI client such as [Bloom RPC] you should be able to send req

Or if you use [grpcurl] then you can simply try send requests like this:
```
$ grpcurl -plaintext -import-path ./proto -proto helloworld.proto -d '{"name": "Tonic"}' [::]:50051 helloworld.Greeter/SayHello
$ grpcurl -plaintext -import-path ./proto -proto helloworld.proto -d '{"name": "Tonic"}' '[::]:50051' helloworld.Greeter/SayHello
```
And receiving responses like this:
```
6 changes: 2 additions & 4 deletions tonic-health/Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,11 +12,11 @@ license = "MIT"
name = "tonic-health"
readme = "README.md"
repository = "https://github.com/hyperium/tonic"
version = "0.7.0"
version = "0.7.1"

[features]
default = ["transport"]
transport = ["tonic/transport", "tonic-build/transport"]
transport = ["tonic/transport"]

[dependencies]
async-stream = "0.3"
@@ -28,6 +28,4 @@ tonic = {version = "0.8", path = "../tonic", features = ["codegen", "prost"]}

[dev-dependencies]
tokio = {version = "1.0", features = ["rt-multi-thread", "macros"]}

[build-dependencies]
tonic-build = {version = "0.8", path = "../tonic-build", features = ["prost"]}
14 changes: 0 additions & 14 deletions tonic-health/build.rs

This file was deleted.

379 changes: 379 additions & 0 deletions tonic-health/src/generated/grpc.health.v1.rs

Large diffs are not rendered by default.

Binary file added tonic-health/src/generated/grpc_health_v1.bin
Binary file not shown.
8 changes: 4 additions & 4 deletions tonic-health/src/lib.rs
Original file line number Diff line number Diff line change
@@ -16,21 +16,21 @@
html_logo_url = "https://raw.githubusercontent.com/tokio-rs/website/master/public/img/icons/tonic.svg"
)]
#![deny(rustdoc::broken_intra_doc_links)]
#![doc(html_root_url = "https://docs.rs/tonic-health/0.6.0")]
#![doc(html_root_url = "https://docs.rs/tonic-health/0.7.1")]
#![doc(issue_tracker_base_url = "https://github.com/hyperium/tonic/issues/")]
#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::fmt::{Display, Formatter};

/// Generated protobuf types from the `grpc.healthy.v1` package.
/// Generated protobuf types from the `grpc.health.v1` package.
pub mod proto {
#![allow(unreachable_pub)]
#![allow(missing_docs)]
tonic::include_proto!("grpc.health.v1");
include!("generated/grpc.health.v1.rs");

pub const GRPC_HEALTH_V1_FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("grpc_health_v1");
include_bytes!("generated/grpc_health_v1.bin");
}

pub mod server;
30 changes: 30 additions & 0 deletions tonic-health/tests/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::{path::PathBuf, process::Command};

#[test]
fn bootstrap() {
let iface_files = &["proto/health.proto"];
let dirs = &["proto"];

let out_dir = PathBuf::from(std::env!("CARGO_MANIFEST_DIR"))
.join("src")
.join("generated");

tonic_build::configure()
.build_client(true)
.build_server(true)
.out_dir(format!("{}", out_dir.display()))
.compile(iface_files, dirs)
.unwrap();

let status = Command::new("git")
.arg("diff")
.arg("--exit-code")
.arg("--")
.arg(format!("{}", out_dir.display()))
.status()
.unwrap();

if !status.success() {
panic!("You should commit the protobuf files");
}
}
6 changes: 3 additions & 3 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -7,20 +7,20 @@ name = "tonic"
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.7.x" git tag.
# - Create "v0.8.x" git tag.
authors = ["Lucio Franco <luciofranco14@gmail.com>"]
categories = ["web-programming", "network-programming", "asynchronous"]
description = """
A gRPC over HTTP/2 implementation focused on high performance, interoperability, and flexibility.
"""
documentation = "https://docs.rs/tonic/0.8.0/tonic/"
documentation = "https://docs.rs/tonic/0.8.1/tonic/"
edition = "2018"
homepage = "https://github.com/hyperium/tonic"
keywords = ["rpc", "grpc", "async", "futures", "protobuf"]
license = "MIT"
readme = "../README.md"
repository = "https://github.com/hyperium/tonic"
version = "0.8.0"
version = "0.8.1"

[features]
codegen = ["async-trait"]
149 changes: 92 additions & 57 deletions tonic/src/client/grpc.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ use crate::codec::compression::{CompressionEncoding, EnabledCompressionEncodings
use crate::{
body::BoxBody,
client::GrpcService,
codec::{encode_client, Codec, Streaming},
codec::{encode_client, Codec, Decoder, Streaming},
request::SanitizeHeaders,
Code, Request, Response, Status,
};
@@ -30,6 +30,10 @@ use std::fmt;
/// [gRPC protocol definition]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
pub struct Grpc<T> {
inner: T,
config: GrpcConfig,
}

struct GrpcConfig {
origin: Uri,
/// Which compression encodings does the client accept?
accept_compression_encodings: EnabledCompressionEncodings,
@@ -40,12 +44,7 @@ pub struct Grpc<T> {
impl<T> Grpc<T> {
/// Creates a new gRPC client with the provided [`GrpcService`].
pub fn new(inner: T) -> Self {
Self {
inner,
origin: Uri::default(),
send_compression_encodings: None,
accept_compression_encodings: EnabledCompressionEncodings::default(),
}
Self::with_origin(inner, Uri::default())
}

/// Creates a new gRPC client with the provided [`GrpcService`] and `Uri`.
@@ -55,9 +54,11 @@ impl<T> Grpc<T> {
pub fn with_origin(inner: T, origin: Uri) -> Self {
Self {
inner,
origin,
send_compression_encodings: None,
accept_compression_encodings: EnabledCompressionEncodings::default(),
config: GrpcConfig {
origin,
send_compression_encodings: None,
accept_compression_encodings: EnabledCompressionEncodings::default(),
},
}
}

@@ -88,7 +89,7 @@ impl<T> Grpc<T> {
/// # };
/// ```
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.send_compression_encodings = Some(encoding);
self.config.send_compression_encodings = Some(encoding);
self
}

@@ -119,7 +120,7 @@ impl<T> Grpc<T> {
/// # };
/// ```
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.accept_compression_encodings.enable(encoding);
self.config.accept_compression_encodings.enable(encoding);
self
}

@@ -226,6 +227,73 @@ impl<T> Grpc<T> {
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
{
let request = request
.map(|s| encode_client(codec.encoder(), s, self.config.send_compression_encodings))
.map(BoxBody::new);

let request = self.config.prepare_request(request, path);

let response = self
.inner
.call(request)
.await
.map_err(Status::from_error_generic)?;

let decoder = codec.decoder();

self.create_response(decoder, response)
}

// Keeping this code in a separate function from Self::streaming lets functions that return the
// same output share the generated binary code
fn create_response<M2>(
&self,
decoder: impl Decoder<Item = M2, Error = Status> + Send + 'static,
response: http::Response<T::ResponseBody>,
) -> Result<Response<Streaming<M2>>, Status>
where
T: GrpcService<BoxBody>,
T::ResponseBody: Body + Send + 'static,
<T::ResponseBody as Body>::Error: Into<crate::Error>,
{
let encoding = CompressionEncoding::from_encoding_header(
response.headers(),
self.config.accept_compression_encodings,
)?;

let status_code = response.status();
let trailers_only_status = Status::from_header_map(response.headers());

// We do not need to check for trailers if the `grpc-status` header is present
// with a valid code.
let expect_additional_trailers = if let Some(status) = trailers_only_status {
if status.code() != Code::Ok {
return Err(status);
}

false
} else {
true
};

let response = response.map(|body| {
if expect_additional_trailers {
Streaming::new_response(decoder, body, status_code, encoding)
} else {
Streaming::new_empty(decoder, body)
}
});

Ok(Response::from_http(response))
}
}

impl GrpcConfig {
fn prepare_request(
&self,
request: Request<http_body::combinators::UnsyncBoxBody<bytes::Bytes, Status>>,
path: PathAndQuery,
) -> http::Request<http_body::combinators::UnsyncBoxBody<bytes::Bytes, Status>> {
let scheme = self.origin.scheme().cloned();
let authority = self.origin.authority().cloned();

@@ -236,10 +304,6 @@ impl<T> Grpc<T> {

let uri = Uri::from_parts(parts).expect("path_and_query only is valid Uri");

let request = request
.map(|s| encode_client(codec.encoder(), s, self.send_compression_encodings))
.map(BoxBody::new);

let mut request = request.into_http(
uri,
http::Method::POST,
@@ -274,51 +338,19 @@ impl<T> Grpc<T> {
);
}

let response = self
.inner
.call(request)
.await
.map_err(|err| Status::from_error(err.into()))?;

let encoding = CompressionEncoding::from_encoding_header(
response.headers(),
self.accept_compression_encodings,
)?;

let status_code = response.status();
let trailers_only_status = Status::from_header_map(response.headers());

// We do not need to check for trailers if the `grpc-status` header is present
// with a valid code.
let expect_additional_trailers = if let Some(status) = trailers_only_status {
if status.code() != Code::Ok {
return Err(status);
}

false
} else {
true
};

let response = response.map(|body| {
if expect_additional_trailers {
Streaming::new_response(codec.decoder(), body, status_code, encoding)
} else {
Streaming::new_empty(codec.decoder(), body)
}
});

Ok(Response::from_http(response))
request
}
}

impl<T: Clone> Clone for Grpc<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
origin: self.origin.clone(),
send_compression_encodings: self.send_compression_encodings,
accept_compression_encodings: self.accept_compression_encodings,
config: GrpcConfig {
origin: self.config.origin.clone(),
send_compression_encodings: self.config.send_compression_encodings,
accept_compression_encodings: self.config.accept_compression_encodings,
},
}
}
}
@@ -329,13 +361,16 @@ impl<T: fmt::Debug> fmt::Debug for Grpc<T> {

f.field("inner", &self.inner);

f.field("origin", &self.origin);
f.field("origin", &self.config.origin);

f.field("compression_encoding", &self.send_compression_encodings);
f.field(
"compression_encoding",
&self.config.send_compression_encodings,
);

f.field(
"accept_compression_encodings",
&self.accept_compression_encodings,
&self.config.accept_compression_encodings,
);

f.finish()
312 changes: 169 additions & 143 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
@@ -21,6 +21,10 @@ const BUFFER_SIZE: usize = 8 * 1024;
/// to fetch the message stream and trailing metadata
pub struct Streaming<T> {
decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + 'static>,
inner: StreamingInner,
}

struct StreamingInner {
body: BoxBody,
state: State,
direction: Direction,
@@ -96,20 +100,157 @@ impl<T> Streaming<T> {
{
Self {
decoder: Box::new(decoder),
body: body
.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
.map_err(|err| Status::map_error(err.into()))
.boxed_unsync(),
state: State::ReadHeader,
direction,
buf: BytesMut::with_capacity(BUFFER_SIZE),
trailers: None,
decompress_buf: BytesMut::new(),
encoding,
inner: StreamingInner {
body: body
.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
.map_err(|err| Status::map_error(err.into()))
.boxed_unsync(),
state: State::ReadHeader,
direction,
buf: BytesMut::with_capacity(BUFFER_SIZE),
trailers: None,
decompress_buf: BytesMut::new(),
encoding,
},
}
}
}

impl StreamingInner {
fn decode_chunk(&mut self) -> Result<Option<DecodeBuf<'_>>, Status> {
if let State::ReadHeader = self.state {
if self.buf.remaining() < HEADER_SIZE {
return Ok(None);
}

let compression_encoding = match self.buf.get_u8() {
0 => None,
1 => {
{
if self.encoding.is_some() {
self.encoding
} else {
// https://grpc.github.io/grpc/core/md_doc_compression.html
// An ill-constructed message with its Compressed-Flag bit set but lacking a grpc-encoding
// entry different from identity in its metadata MUST fail with INTERNAL status,
// its associated description indicating the invalid Compressed-Flag condition.
return Err(Status::new(Code::Internal, "protocol error: received message with compressed-flag but no grpc-encoding was specified"));
}
}
}
f => {
trace!("unexpected compression flag");
let message = if let Direction::Response(status) = self.direction {
format!(
"protocol error: received message with invalid compression flag: {} (valid flags are 0 and 1) while receiving response with status: {}",
f, status
)
} else {
format!("protocol error: received message with invalid compression flag: {} (valid flags are 0 and 1), while sending request", f)
};
return Err(Status::new(Code::Internal, message));
}
};
let len = self.buf.get_u32() as usize;
self.buf.reserve(len);

self.state = State::ReadBody {
compression: compression_encoding,
len,
}
}

if let State::ReadBody { len, compression } = self.state {
// if we haven't read enough of the message then return and keep
// reading
if self.buf.remaining() < len || self.buf.len() < len {
return Ok(None);
}

let decode_buf = if let Some(encoding) = compression {
self.decompress_buf.clear();

if let Err(err) = decompress(encoding, &mut self.buf, &mut self.decompress_buf, len)
{
let message = if let Direction::Response(status) = self.direction {
format!(
"Error decompressing: {}, while receiving response with status: {}",
err, status
)
} else {
format!("Error decompressing: {}, while sending request", err)
};
return Err(Status::new(Code::Internal, message));
}
let decompressed_len = self.decompress_buf.len();
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
} else {
DecodeBuf::new(&mut self.buf, len)
};

return Ok(Some(decode_buf));
}

Ok(None)
}

// Returns Some(()) if data was found or None if the loop in `poll_next` should break
fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(e)) => {
let _ = std::mem::replace(&mut self.state, State::Error);
let err: crate::Error = e.into();
debug!("decoder inner stream error: {:?}", err);
let status = Status::from_error(err);
return Poll::Ready(Err(status));
}
None => None,
};

Poll::Ready(if let Some(data) = chunk {
self.buf.put(data);
Ok(Some(()))
} else {
// FIXME: improve buf usage.
if self.buf.has_remaining() {
trace!("unexpected EOF decoding stream");
Err(Status::new(
Code::Internal,
"Unexpected EOF decoding stream.".to_string(),
))
} else {
Ok(None)
}
})
}

fn poll_response(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Status>> {
if let Direction::Response(status) = self.direction {
match ready!(Pin::new(&mut self.body).poll_trailers(cx)) {
Ok(trailer) => {
if let Err(e) = crate::status::infer_grpc_status(trailer.as_ref(), status) {
if let Some(e) = e {
return Poll::Ready(Err(e));
} else {
return Poll::Ready(Ok(()));
}
} else {
self.trailers = trailer.map(MetadataMap::from_headers);
}
}
Err(e) => {
let err: crate::Error = e.into();
debug!("decoder inner trailers error: {:?}", err);
let status = Status::from_error(err);
return Poll::Ready(Err(status));
}
}
}
Poll::Ready(Ok(()))
}
}

impl<T> Streaming<T> {
/// Fetch the next message from this stream.
///
@@ -165,7 +306,7 @@ impl<T> Streaming<T> {
pub async fn trailers(&mut self) -> Result<Option<MetadataMap>, Status> {
// Shortcut to see if we already pulled the trailers in the stream step
// we need to do that so that the stream can error on trailing grpc-status
if let Some(trailers) = self.trailers.take() {
if let Some(trailers) = self.inner.trailers.take() {
return Ok(Some(trailers));
}

@@ -174,104 +315,30 @@ impl<T> Streaming<T> {

// Since we call poll_trailers internally on poll_next we need to
// check if it got cached again.
if let Some(trailers) = self.trailers.take() {
if let Some(trailers) = self.inner.trailers.take() {
return Ok(Some(trailers));
}

// Trailers were not caught during poll_next and thus lets poll for
// them manually.
let map = future::poll_fn(|cx| Pin::new(&mut self.body).poll_trailers(cx))
let map = future::poll_fn(|cx| Pin::new(&mut self.inner.body).poll_trailers(cx))
.await
.map_err(|e| Status::from_error(Box::new(e)));

map.map(|x| x.map(MetadataMap::from_headers))
}

fn decode_chunk(&mut self) -> Result<Option<T>, Status> {
if let State::ReadHeader = self.state {
if self.buf.remaining() < HEADER_SIZE {
return Ok(None);
}

let compression_encoding = match self.buf.get_u8() {
0 => None,
1 => {
{
if self.encoding.is_some() {
self.encoding
} else {
// https://grpc.github.io/grpc/core/md_doc_compression.html
// An ill-constructed message with its Compressed-Flag bit set but lacking a grpc-encoding
// entry different from identity in its metadata MUST fail with INTERNAL status,
// its associated description indicating the invalid Compressed-Flag condition.
return Err(Status::new(Code::Internal, "protocol error: received message with compressed-flag but no grpc-encoding was specified"));
}
}
}
f => {
trace!("unexpected compression flag");
let message = if let Direction::Response(status) = self.direction {
format!(
"protocol error: received message with invalid compression flag: {} (valid flags are 0 and 1) while receiving response with status: {}",
f, status
)
} else {
format!("protocol error: received message with invalid compression flag: {} (valid flags are 0 and 1), while sending request", f)
};
return Err(Status::new(Code::Internal, message));
}
};
let len = self.buf.get_u32() as usize;
self.buf.reserve(len);

self.state = State::ReadBody {
compression: compression_encoding,
len,
}
}

if let State::ReadBody { len, compression } = self.state {
// if we haven't read enough of the message then return and keep
// reading
if self.buf.remaining() < len || self.buf.len() < len {
return Ok(None);
}

let decoding_result = if let Some(encoding) = compression {
self.decompress_buf.clear();

if let Err(err) = decompress(encoding, &mut self.buf, &mut self.decompress_buf, len)
{
let message = if let Direction::Response(status) = self.direction {
format!(
"Error decompressing: {}, while receiving response with status: {}",
err, status
)
} else {
format!("Error decompressing: {}, while sending request", err)
};
return Err(Status::new(Code::Internal, message));
}
let decompressed_len = self.decompress_buf.len();
self.decoder.decode(&mut DecodeBuf::new(
&mut self.decompress_buf,
decompressed_len,
))
} else {
self.decoder.decode(&mut DecodeBuf::new(&mut self.buf, len))
};

return match decoding_result {
Ok(Some(msg)) => {
self.state = State::ReadHeader;
match self.inner.decode_chunk()? {
Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? {
Some(msg) => {
self.inner.state = State::ReadHeader;
Ok(Some(msg))
}
Ok(None) => Ok(None),
Err(e) => Err(e),
};
None => Ok(None),
},
None => Ok(None),
}

Ok(None)
}
}

@@ -280,7 +347,7 @@ impl<T> Stream for Streaming<T> {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let State::Error = &self.state {
if let State::Error = &self.inner.state {
return Poll::Ready(None);
}

@@ -291,57 +358,16 @@ impl<T> Stream for Streaming<T> {
return Poll::Ready(Some(Ok(item)));
}

let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(e)) => {
let _ = std::mem::replace(&mut self.state, State::Error);
let err: crate::Error = e.into();
debug!("decoder inner stream error: {:?}", err);
let status = Status::from_error(err);
return Poll::Ready(Some(Err(status)));
}
None => None,
};

if let Some(data) = chunk {
self.buf.put(data);
} else {
// FIXME: improve buf usage.
if self.buf.has_remaining() {
trace!("unexpected EOF decoding stream");
return Poll::Ready(Some(Err(Status::new(
Code::Internal,
"Unexpected EOF decoding stream.".to_string(),
))));
} else {
break;
}
}
}

if let Direction::Response(status) = self.direction {
match ready!(Pin::new(&mut self.body).poll_trailers(cx)) {
Ok(trailer) => {
if let Err(e) = crate::status::infer_grpc_status(trailer.as_ref(), status) {
if let Some(e) = e {
return Some(Err(e)).into();
} else {
return Poll::Ready(None);
}
} else {
self.trailers = trailer.map(MetadataMap::from_headers);
}
}
Err(e) => {
let err: crate::Error = e.into();
debug!("decoder inner trailers error: {:?}", err);
let status = Status::from_error(err);
return Some(Err(status)).into();
}
match ready!(self.inner.poll_data(cx))? {
Some(()) => (),
None => break,
}
}

Poll::Ready(None)
Poll::Ready(match ready!(self.inner.poll_response(cx)) {
Ok(()) => None,
Err(err) => Some(Err(err)),
})
}
}

201 changes: 114 additions & 87 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
@@ -58,66 +58,80 @@ where
T: Encoder<Error = Status>,
U: Stream<Item = Result<T::Item, Status>>,
{
async_stream::stream! {
let mut buf = BytesMut::with_capacity(BUFFER_SIZE);

let compression_encoding = if compression_override == SingleMessageCompressionOverride::Disable {
None
} else {
compression_encoding
};

let mut uncompression_buf = if compression_encoding.is_some() {
BytesMut::with_capacity(BUFFER_SIZE)
} else {
BytesMut::new()
};

futures_util::pin_mut!(source);

loop {
match source.next().await {
Some(Ok(item)) => {
buf.reserve(HEADER_SIZE);
unsafe {
buf.advance_mut(HEADER_SIZE);
}

if let Some(encoding) = compression_encoding {
uncompression_buf.clear();

encoder.encode(item, &mut EncodeBuf::new(&mut uncompression_buf))
.map_err(|err| Status::internal(format!("Error encoding: {}", err)))?;

let uncompressed_len = uncompression_buf.len();

compress(
encoding,
&mut uncompression_buf,
&mut buf,
uncompressed_len,
).map_err(|err| Status::internal(format!("Error compressing: {}", err)))?;
} else {
encoder.encode(item, &mut EncodeBuf::new(&mut buf))
.map_err(|err| Status::internal(format!("Error encoding: {}", err)))?;
}

// now that we know length, we can write the header
let len = buf.len() - HEADER_SIZE;
assert!(len <= std::u32::MAX as usize);
{
let mut buf = &mut buf[..HEADER_SIZE];
buf.put_u8(compression_encoding.is_some() as u8);
buf.put_u32(len as u32);
}

yield Ok(buf.split_to(len + HEADER_SIZE).freeze());
},
Some(Err(status)) => yield Err(status),
None => break,
}
}
let mut buf = BytesMut::with_capacity(BUFFER_SIZE);

let compression_encoding = if compression_override == SingleMessageCompressionOverride::Disable
{
None
} else {
compression_encoding
};

let mut uncompression_buf = if compression_encoding.is_some() {
BytesMut::with_capacity(BUFFER_SIZE)
} else {
BytesMut::new()
};

source.map(move |result| {
let item = result?;

encode_item(
&mut encoder,
&mut buf,
&mut uncompression_buf,
compression_encoding,
item,
)
})
}

fn encode_item<T>(
encoder: &mut T,
buf: &mut BytesMut,
uncompression_buf: &mut BytesMut,
compression_encoding: Option<CompressionEncoding>,
item: T::Item,
) -> Result<Bytes, Status>
where
T: Encoder<Error = Status>,
{
buf.reserve(HEADER_SIZE);
unsafe {
buf.advance_mut(HEADER_SIZE);
}

if let Some(encoding) = compression_encoding {
uncompression_buf.clear();

encoder
.encode(item, &mut EncodeBuf::new(uncompression_buf))
.map_err(|err| Status::internal(format!("Error encoding: {}", err)))?;

let uncompressed_len = uncompression_buf.len();

compress(encoding, uncompression_buf, buf, uncompressed_len)
.map_err(|err| Status::internal(format!("Error compressing: {}", err)))?;
} else {
encoder
.encode(item, &mut EncodeBuf::new(buf))
.map_err(|err| Status::internal(format!("Error encoding: {}", err)))?;
}

// now that we know length, we can write the header
Ok(finish_encoding(compression_encoding, buf))
}

fn finish_encoding(compression_encoding: Option<CompressionEncoding>, buf: &mut BytesMut) -> Bytes {
let len = buf.len() - HEADER_SIZE;
assert!(len <= std::u32::MAX as usize);
{
let mut buf = &mut buf[..HEADER_SIZE];
buf.put_u8(compression_encoding.is_some() as u8);
buf.put_u32(len as u32);
}

buf.split_to(len + HEADER_SIZE).freeze()
}

#[derive(Debug)]
@@ -131,6 +145,11 @@ enum Role {
pub(crate) struct EncodeBody<S> {
#[pin]
inner: S,
state: EncodeState,
}

#[derive(Debug)]
struct EncodeState {
error: Option<Status>,
role: Role,
is_end_stream: bool,
@@ -143,18 +162,44 @@ where
pub(crate) fn new_client(inner: S) -> Self {
Self {
inner,
error: None,
role: Role::Client,
is_end_stream: false,
state: EncodeState {
error: None,
role: Role::Client,
is_end_stream: false,
},
}
}

pub(crate) fn new_server(inner: S) -> Self {
Self {
inner,
error: None,
role: Role::Server,
is_end_stream: false,
state: EncodeState {
error: None,
role: Role::Server,
is_end_stream: false,
},
}
}
}

impl EncodeState {
fn trailers(&mut self) -> Result<Option<HeaderMap>, Status> {
match self.role {
Role::Client => Ok(None),
Role::Server => {
if self.is_end_stream {
return Ok(None);
}

let status = if let Some(status) = self.error.take() {
self.is_end_stream = true;
status
} else {
Status::new(Code::Ok, "")
};

Ok(Some(status.to_header_map()?))
}
}
}
}
@@ -167,7 +212,7 @@ where
type Error = Status;

fn is_end_stream(&self) -> bool {
self.is_end_stream
self.state.is_end_stream
}

fn poll_data(
@@ -177,10 +222,10 @@ where
let mut self_proj = self.project();
match ready!(self_proj.inner.try_poll_next_unpin(cx)) {
Some(Ok(d)) => Some(Ok(d)).into(),
Some(Err(status)) => match self_proj.role {
Some(Err(status)) => match self_proj.state.role {
Role::Client => Some(Err(status)).into(),
Role::Server => {
*self_proj.error = Some(status);
self_proj.state.error = Some(status);
None.into()
}
},
@@ -192,24 +237,6 @@ where
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Status>> {
match self.role {
Role::Client => Poll::Ready(Ok(None)),
Role::Server => {
let self_proj = self.project();

if *self_proj.is_end_stream {
return Poll::Ready(Ok(None));
}

let status = if let Some(status) = self_proj.error.take() {
*self_proj.is_end_stream = true;
status
} else {
Status::new(Code::Ok, "")
};

Poll::Ready(Ok(Some(status.to_header_map()?)))
}
}
Poll::Ready(self.project().state.trailers())
}
}
4 changes: 3 additions & 1 deletion tonic/src/extensions.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ use std::fmt;
///
/// [`Interceptor`]: crate::service::Interceptor
/// [`Request`]: crate::Request
#[derive(Default)]
pub struct Extensions {
inner: http::Extensions,
}
@@ -58,8 +59,9 @@ impl Extensions {
Self { inner: http }
}

/// Convert to `http::Extensions` and consume self.
#[inline]
pub(crate) fn into_http(self) -> http::Extensions {
pub fn into_http(self) -> http::Extensions {
self.inner
}
}
8 changes: 4 additions & 4 deletions tonic/src/lib.rs
Original file line number Diff line number Diff line change
@@ -30,9 +30,9 @@
//! - `tls-webpki-roots`: Add the standard trust roots from the `webpki-roots` crate to
//! `rustls`-based gRPC clients. Not enabled by default.
//! - `prost`: Enables the [`prost`] based gRPC [`Codec`] implementation.
//! - `compression`: Enables compressing requests, responses, and streams. Note
//! that you must enable the `compression` feature on both `tonic` and
//! `tonic-build` to use it. Depends on [flate2]. Not enabled by default.
//! - `gzip`: Enables compressing requests, responses, and streams.
//! Depends on [flate2]. Not enabled by default.
//! Replaces the `compression` flag from earlier versions of `tonic` (<= 0.7).
//!
//! # Structure
//!
@@ -81,7 +81,7 @@
#![doc(
html_logo_url = "https://raw.githubusercontent.com/tokio-rs/website/master/public/img/icons/tonic.svg"
)]
#![doc(html_root_url = "https://docs.rs/tonic/0.8.0")]
#![doc(html_root_url = "https://docs.rs/tonic/0.8.1")]
#![doc(issue_tracker_base_url = "https://github.com/hyperium/tonic/issues/")]
#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
#![cfg_attr(docsrs, feature(doc_cfg))]
13 changes: 12 additions & 1 deletion tonic/src/status.rs
Original file line number Diff line number Diff line change
@@ -304,7 +304,18 @@ impl Status {
}

#[cfg_attr(not(feature = "transport"), allow(dead_code))]
pub(crate) fn from_error(err: Box<dyn Error + Send + Sync + 'static>) -> Status {
pub(crate) fn from_error_generic(
err: impl Into<Box<dyn Error + Send + Sync + 'static>>,
) -> Status {
Self::from_error(err.into())
}

/// Create a `Status` from various types of `Error`.
///
/// Inspects the error source chain for recognizable errors, including statuses, HTTP2, and
/// hyper, and attempts to maps them to a `Status`, or else returns an Unknown `Status`.
#[cfg_attr(not(feature = "transport"), allow(dead_code))]
pub fn from_error(err: Box<dyn Error + Send + Sync + 'static>) -> Status {
Status::try_from_error(err).unwrap_or_else(|err| {
let mut status = Status::new(Code::Unknown, err.to_string());
status.source = Some(err);
8 changes: 1 addition & 7 deletions tonic/src/transport/server/incoming.rs
Original file line number Diff line number Diff line change
@@ -23,13 +23,7 @@ where
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
IE: Into<crate::Error>,
{
async_stream::try_stream! {
futures_util::pin_mut!(incoming);

while let Some(stream) = incoming.try_next().await? {
yield ServerIo::new_io(stream);
}
}
incoming.err_into().map_ok(ServerIo::new_io)
}

#[cfg(feature = "tls")]
16 changes: 16 additions & 0 deletions tonic/src/transport/server/mod.rs
Original file line number Diff line number Diff line change
@@ -90,6 +90,7 @@ pub struct Server<L = Identity> {
tcp_nodelay: bool,
http2_keepalive_interval: Option<Duration>,
http2_keepalive_timeout: Option<Duration>,
http2_adaptive_window: Option<bool>,
max_frame_size: Option<u32>,
accept_http1: bool,
service_builder: ServiceBuilder<L>,
@@ -110,6 +111,7 @@ impl Default for Server<Identity> {
tcp_nodelay: false,
http2_keepalive_interval: None,
http2_keepalive_timeout: None,
http2_adaptive_window: None,
max_frame_size: None,
accept_http1: false,
service_builder: Default::default(),
@@ -258,6 +260,17 @@ impl<L> Server<L> {
}
}

/// Sets whether to use an adaptive flow control. Defaults to false.
/// Enabling this will override the limits set in http2_initial_stream_window_size and
/// http2_initial_connection_window_size.
#[must_use]
pub fn http2_adaptive_window(self, enabled: Option<bool>) -> Self {
Server {
http2_adaptive_window: enabled,
..self
}
}

/// Set whether TCP keepalive messages are enabled on accepted connections.
///
/// If `None` is specified, keepalive is disabled, otherwise the duration
@@ -439,6 +452,7 @@ impl<L> Server<L> {
tcp_nodelay: self.tcp_nodelay,
http2_keepalive_interval: self.http2_keepalive_interval,
http2_keepalive_timeout: self.http2_keepalive_timeout,
http2_adaptive_window: self.http2_adaptive_window,
max_frame_size: self.max_frame_size,
accept_http1: self.accept_http1,
}
@@ -476,6 +490,7 @@ impl<L> Server<L> {
let http2_keepalive_timeout = self
.http2_keepalive_timeout
.unwrap_or_else(|| Duration::new(DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS, 0));
let http2_adaptive_window = self.http2_adaptive_window;

let svc = self.service_builder.service(svc);

@@ -497,6 +512,7 @@ impl<L> Server<L> {
.http2_max_concurrent_streams(max_concurrent_streams)
.http2_keep_alive_interval(http2_keepalive_interval)
.http2_keep_alive_timeout(http2_keepalive_timeout)
.http2_adaptive_window(http2_adaptive_window.unwrap_or_default())
.http2_max_frame_size(max_frame_size);

if let Some(signal) = signal {