Skip to content

Commit

Permalink
feat: enable client to server compression (#2117)
Browse files Browse the repository at this point in the history
Enable compression of network traffic from client to server.

See the below screenshots from Wireshark for tests that show that the compression works both ways. Note: The rate of compression will depend on the type of data that is being sent. The below example for client->server communication is for sending 1,000 very similar mutations to Spanner. That compresses very well with gzip. More random data will compress less.

### Server -> Client - No Compression
![server-_client identity](https://user-images.githubusercontent.com/1196707/196627944-4d6f34fa-78a9-4023-a848-51a0ffe5228e.png)

### Server -> Client - Gzip Compression
![server-_client gzip](https://user-images.githubusercontent.com/1196707/196627948-d2d574ed-3f69-4b3c-8613-c162e21e207d.png)

### Client -> Server - No Compression
![client-_server identity](https://user-images.githubusercontent.com/1196707/196627950-b01ecb88-f6e8-4fd9-b7f8-257ad4211ac9.png)

### Client -> Server - Gzip Compression
![client-_server gzip](https://user-images.githubusercontent.com/1196707/196627952-ecf8b4bd-cd44-4941-af55-6aecaf03f69e.png)
  • Loading branch information
olavloite committed Oct 20, 2022
1 parent 44f27fc commit 50f8425
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 1 deletion.
Expand Up @@ -1060,7 +1060,8 @@ public Builder setCallCredentialsProvider(CallCredentialsProvider callCredential

/**
* Sets the compression to use for all gRPC calls. The compressor must be a valid name known in
* the {@link CompressorRegistry}.
* the {@link CompressorRegistry}. This will enable compression both from the client to the
* server and from the server to the client.
*
* <p>Supported values are:
*
Expand Down
Expand Up @@ -389,6 +389,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault()))
// This sets the response compressor (Server -> Client).
.withEncoding(compressorName))
.setHeaderProvider(headerProviderWithUserAgent)
// Attempts direct access to spanner service over gRPC to improve throughput,
Expand Down Expand Up @@ -1901,6 +1902,10 @@ <ReqT, RespT> GrpcCallContext newCallContext(
if (options != null) {
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
}
if (compressorName != null) {
// This sets the compressor for Client -> Server.
context = context.withCallOptions(context.getCallOptions().withCompression(compressorName));
}
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
if (callCredentialsProvider != null) {
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();
Expand Down
Expand Up @@ -130,6 +130,12 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// Verify that the compressor name header is set.
assertEquals(
"gzip",
headers.get(
Metadata.Key.of(
"x-response-encoding", Metadata.ASCII_STRING_MARSHALLER)));
Attributes attributes = call.getAttributes();
@SuppressWarnings({"unchecked", "deprecation"})
Attributes.Key<InetSocketAddress> key =
Expand Down Expand Up @@ -179,6 +185,7 @@ private SpannerOptions createSpannerOptions() {
return input;
})
.setNumChannels(numChannels)
.setCompressorName("gzip")
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setMinSessions(numChannels * 2)
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assume.assumeTrue;

Expand Down Expand Up @@ -259,6 +260,58 @@ public void testNoCallCredentials() {
rpc.shutdown();
}

@Test
public void testClientCompressorGzip() {
SpannerOptions options =
SpannerOptions.newBuilder().setProjectId("some-project").setCompressorName("gzip").build();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, false);
assertEquals(
"gzip",
rpc.newCallContext(
optionsMap,
"/some/resource",
GetSessionRequest.getDefaultInstance(),
SpannerGrpc.getGetSessionMethod())
.getCallOptions()
.getCompressor());
rpc.shutdown();
}

@Test
public void testClientCompressorIdentity() {
SpannerOptions options =
SpannerOptions.newBuilder()
.setProjectId("some-project")
.setCompressorName("identity")
.build();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, false);
assertEquals(
"identity",
rpc.newCallContext(
optionsMap,
"/some/resource",
GetSessionRequest.getDefaultInstance(),
SpannerGrpc.getGetSessionMethod())
.getCallOptions()
.getCompressor());
rpc.shutdown();
}

@Test
public void testClientCompressorDefault() {
SpannerOptions options = SpannerOptions.newBuilder().setProjectId("some-project").build();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, false);
assertNull(
rpc.newCallContext(
optionsMap,
"/some/resource",
GetSessionRequest.getDefaultInstance(),
SpannerGrpc.getGetSessionMethod())
.getCallOptions()
.getCompressor());
rpc.shutdown();
}

private static final class TimeoutHolder {

private Duration timeout;
Expand Down

0 comments on commit 50f8425

Please sign in to comment.