Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable client to server compression #2117

Merged
merged 2 commits into from Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1056,7 +1056,8 @@ public Builder setCallCredentialsProvider(CallCredentialsProvider callCredential

/**
* Sets the compression to use for all gRPC calls. The compressor must be a valid name known in
* the {@link CompressorRegistry}.
* the {@link CompressorRegistry}. This will enable compression both from the client to the
* server and from the server to the client.
*
* <p>Supported values are:
*
Expand Down
Expand Up @@ -389,6 +389,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault()))
// This sets the response compressor (Server -> Client).
.withEncoding(compressorName))
.setHeaderProvider(headerProviderWithUserAgent)
// Attempts direct access to spanner service over gRPC to improve throughput,
Expand Down Expand Up @@ -1901,6 +1902,10 @@ <ReqT, RespT> GrpcCallContext newCallContext(
if (options != null) {
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
}
if (compressorName != null) {
// This sets the compressor for Client -> Server.
context = context.withCallOptions(context.getCallOptions().withCompression(compressorName));
}
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
if (callCredentialsProvider != null) {
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();
Expand Down
Expand Up @@ -130,6 +130,12 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// Verify that the compressor name header is set.
assertEquals(
"gzip",
headers.get(
Metadata.Key.of(
"x-response-encoding", Metadata.ASCII_STRING_MARSHALLER)));
Attributes attributes = call.getAttributes();
@SuppressWarnings({"unchecked", "deprecation"})
Attributes.Key<InetSocketAddress> key =
Expand Down Expand Up @@ -179,6 +185,7 @@ private SpannerOptions createSpannerOptions() {
return input;
})
.setNumChannels(numChannels)
.setCompressorName("gzip")
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setMinSessions(numChannels * 2)
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assume.assumeTrue;

Expand Down Expand Up @@ -259,6 +260,58 @@ public void testNoCallCredentials() {
rpc.shutdown();
}

@Test
public void testClientCompressorGzip() {
SpannerOptions options =
SpannerOptions.newBuilder().setProjectId("some-project").setCompressorName("gzip").build();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, false);
assertEquals(
"gzip",
rpc.newCallContext(
optionsMap,
"/some/resource",
GetSessionRequest.getDefaultInstance(),
SpannerGrpc.getGetSessionMethod())
.getCallOptions()
.getCompressor());
rpc.shutdown();
}

@Test
public void testClientCompressorIdentity() {
SpannerOptions options =
SpannerOptions.newBuilder()
.setProjectId("some-project")
.setCompressorName("identity")
.build();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, false);
assertEquals(
"identity",
rpc.newCallContext(
optionsMap,
"/some/resource",
GetSessionRequest.getDefaultInstance(),
SpannerGrpc.getGetSessionMethod())
.getCallOptions()
.getCompressor());
rpc.shutdown();
}

@Test
public void testClientCompressorDefault() {
SpannerOptions options = SpannerOptions.newBuilder().setProjectId("some-project").build();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, false);
assertNull(
rpc.newCallContext(
optionsMap,
"/some/resource",
GetSessionRequest.getDefaultInstance(),
SpannerGrpc.getGetSessionMethod())
.getCallOptions()
.getCompressor());
rpc.shutdown();
}

private static final class TimeoutHolder {

private Duration timeout;
Expand Down