Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding grpc compression support #1000

Merged
merged 54 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
47e4d21
Adding gRPC compression support to the library
rajanya-google Feb 3, 2022
7190fca
Minor comment fix
rajanya-google Feb 3, 2022
8d05ba2
Formatting the code
rajanya-google Feb 3, 2022
13a071d
Adding unit test for compression
rajanya-google Feb 7, 2022
d7eb3dc
Adding integration test for compression
rajanya-google Feb 8, 2022
49896ac
Formatting
rajanya-google Feb 8, 2022
f9bea54
Refactoring integration tests to add support for overriding endpoint
rajanya-google Feb 8, 2022
be4f7b9
Adding sample for publish with compression; Updating README
rajanya-google Feb 9, 2022
525a738
Adding integration test for compression sample
rajanya-google Feb 9, 2022
b6d51d0
Adding parameter compressionBytesThreshold to Publisher; Adding loggi…
rajanya-google Feb 16, 2022
e0819e5
Addressing PR comments
rajanya-google Mar 2, 2022
6412cf2
Addressing checkstyle
rajanya-google Mar 2, 2022
014fe2a
Addressed PR comment
rajanya-google Mar 2, 2022
4bbe64a
Addressing PR comment to put a Precondition for compression and its t…
rajanya-google Mar 10, 2022
3a8c5f7
Addressing PR review
rajanya-google Mar 11, 2022
01c7e3f
Removing logging from example
rajanya-google Mar 11, 2022
22dee27
Adding logging properties
rajanya-google Mar 11, 2022
7832599
Making the publish call unified with context as per PR comments
rajanya-google Mar 15, 2022
9f3fcc0
Removing sample code as per tianzi@'s comments
rajanya-google Mar 21, 2022
9ae5944
Minor fixes
rajanya-google Apr 11, 2022
2fa8849
Adding gRPC compression support to the library
rajanya-google Feb 3, 2022
ab3fc37
Minor comment fix
rajanya-google Feb 3, 2022
d71a052
Formatting the code
rajanya-google Feb 3, 2022
99686ea
Adding unit test for compression
rajanya-google Feb 7, 2022
d8816ca
Adding integration test for compression
rajanya-google Feb 8, 2022
cfb85a0
Formatting
rajanya-google Feb 8, 2022
1d89073
Refactoring integration tests to add support for overriding endpoint
rajanya-google Feb 8, 2022
ed13022
Adding sample for publish with compression; Updating README
rajanya-google Feb 9, 2022
a133b99
Adding integration test for compression sample
rajanya-google Feb 9, 2022
9087f22
Adding parameter compressionBytesThreshold to Publisher; Adding loggi…
rajanya-google Feb 16, 2022
509193c
Addressing PR comments
rajanya-google Mar 2, 2022
061f679
Addressing checkstyle
rajanya-google Mar 2, 2022
86a8327
Addressed PR comment
rajanya-google Mar 2, 2022
1abc9f8
Addressing PR comment to put a Precondition for compression and its t…
rajanya-google Mar 10, 2022
68484e5
Addressing PR review
rajanya-google Mar 11, 2022
fc2378e
Removing logging from example
rajanya-google Mar 11, 2022
7c34ef6
Adding logging properties
rajanya-google Mar 11, 2022
92c2824
Making the publish call unified with context as per PR comments
rajanya-google Mar 15, 2022
265350a
Removing sample code as per tianzi@'s comments
rajanya-google Mar 21, 2022
2e9a738
Minor fixes
rajanya-google Apr 11, 2022
47e725f
Merge remote-tracking branch 'origin/grpc-compression' into grpc-comp…
rajanya-google Apr 28, 2022
cef13ea
Fixing IT
rajanya-google Apr 28, 2022
ed73695
Creating a class variable publishContext to remove the overhead of Gr…
rajanya-google May 5, 2022
db27b12
fixing lint format
rajanya-google May 5, 2022
4d44679
Addressed PR comments
rajanya-google May 6, 2022
b1a2d37
Removing test
rajanya-google May 6, 2022
a26562e
build(deps): update dependency com.google.cloud:google-cloud-shared-c…
renovate-bot Apr 29, 2022
d913048
fix: added exactly once delivery files to owlbot config (#1106)
mmicatka May 2, 2022
5c9778a
chore(bazel): update version of Protobuf to v3.20.1 (#1079)
gcf-owl-bot[bot] May 2, 2022
66d356f
chore(deps): upgrade gapic-generator-java to 2.7.0 and update gax-jav…
gcf-owl-bot[bot] May 5, 2022
53ebb0b
build(deps): update dependency org.apache.maven.plugins:maven-project…
renovate-bot May 5, 2022
1fb553c
Formatting
rajanya-google Feb 8, 2022
f83bcc7
Formatting
rajanya-google May 9, 2022
3903653
Merge remote-tracking branch 'upstream/main' into grpc-compression
rajanya-google May 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,6 +35,7 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.NoHeaderProvider;
Expand All @@ -50,6 +51,7 @@
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.TopicNames;
import io.grpc.CallOptions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -89,6 +91,8 @@
public class Publisher implements PublisherInterface {
private static final Logger logger = Logger.getLogger(Publisher.class.getName());

private static final String GZIP_COMPRESSION = "gzip";

private final String topicName;

private final BatchingSettings batchingSettings;
Expand All @@ -114,6 +118,12 @@ public class Publisher implements PublisherInterface {

private MessageFlowController flowController = null;

private final boolean enableCompression;
private final long compressionBytesThreshold;

private final GrpcCallContext publishContext;
private final GrpcCallContext publishContextWithCompression;

/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
return 1000L;
Expand All @@ -140,6 +150,8 @@ private Publisher(Builder builder) throws IOException {

this.enableMessageOrdering = builder.enableMessageOrdering;
this.messageTransform = builder.messageTransform;
this.enableCompression = builder.enableCompression;
this.compressionBytesThreshold = builder.compressionBytesThreshold;

messagesBatches = new HashMap<>();
messagesBatchLock = new ReentrantLock();
Expand Down Expand Up @@ -191,6 +203,10 @@ private Publisher(Builder builder) throws IOException {
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
shutdown = new AtomicBoolean(false);
messagesWaiter = new Waiter();
this.publishContext = GrpcCallContext.createDefault();
this.publishContextWithCompression =
GrpcCallContext.createDefault()
.withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION));
}

/** Topic which the publisher publishes to. */
Expand Down Expand Up @@ -431,13 +447,18 @@ private void publishAllWithoutInflightForKey(final String orderingKey) {
}

private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
GrpcCallContext context = publishContext;
if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) {
mmicatka marked this conversation as resolved.
Show resolved Hide resolved
context = publishContextWithCompression;
}
return publisherStub
.publishCallable()
.futureCall(
PublishRequest.newBuilder()
.setTopic(topicName)
.addAllMessages(outstandingBatch.getMessages())
.build());
.build(),
context);
}

private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
Expand Down Expand Up @@ -688,6 +709,8 @@ public static final class Builder {
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
.build();
static final boolean DEFAULT_ENABLE_COMPRESSION = false;
rajanya-google marked this conversation as resolved.
Show resolved Hide resolved
static final long DEFAULT_COMPRESSION_BYTES_THRESHOLD = 240L;

String topicName;
private String endpoint = PublisherStubSettings.getDefaultEndpoint();
Expand Down Expand Up @@ -717,6 +740,9 @@ public PubsubMessage apply(PubsubMessage input) {
}
};

private boolean enableCompression = DEFAULT_ENABLE_COMPRESSION;
private long compressionBytesThreshold = DEFAULT_COMPRESSION_BYTES_THRESHOLD;

private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
}
Expand Down Expand Up @@ -827,6 +853,21 @@ public Builder setEndpoint(String endpoint) {
return this;
}

/** Gives the ability to enable transport compression. */
public Builder setEnableCompression(boolean enableCompression) {
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
this.enableCompression = enableCompression;
return this;
}

/**
* Sets the threshold (in bytes) above which messages are compressed for transport. Only takes
* effect if setEnableCompression(true) is also called."
*/
public Builder setCompressionBytesThreshold(long compressionBytesThreshold) {
this.compressionBytesThreshold = compressionBytesThreshold;
mmicatka marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

/** Returns the default BatchingSettings used by the client if settings are not provided. */
public static BatchingSettings getDefaultBatchingSettings() {
return DEFAULT_BATCHING_SETTINGS;
Expand Down
Expand Up @@ -40,11 +40,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.*;
import org.junit.rules.Timeout;

public class ITPubSubTest {
Expand Down Expand Up @@ -403,6 +399,70 @@ public void failed(Subscriber.State from, Throwable failure) {
topicAdminClient.deleteTopic(topicName);
}

@Test
public void testPublishSubscribeWithCompression() throws Exception {
TopicName topicName =
TopicName.newBuilder()
.setProject(projectId)
.setTopic(formatForTest("testing-compression-topic"))
.build();
SubscriptionName subscriptionName =
SubscriptionName.of(projectId, formatForTest("testing-compression-subscription"));

topicAdminClient.createTopic(topicName);

subscriptionAdminClient.createSubscription(
getSubscription(subscriptionName, topicName, PushConfig.newBuilder().build(), 10, false));

final BlockingQueue<Object> receiveQueue = new LinkedBlockingQueue<>();
Subscriber subscriber =
Subscriber.newBuilder(
subscriptionName.toString(),
new MessageReceiver() {
@Override
public void receiveMessage(
final PubsubMessage message, final AckReplyConsumer consumer) {
receiveQueue.offer(MessageAndConsumer.create(message, consumer));
}
})
.build();
subscriber.addListener(
new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
receiveQueue.offer(failure);
}
},
MoreExecutors.directExecutor());
subscriber.startAsync();

Publisher publisher = Publisher.newBuilder(topicName).setEnableCompression(true).build();

String msg1 = generateMessage("msg1", 1000);
String msg2 = generateMessage("msg2", 1500);
publisher
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(msg1)).build())
.get();
publisher
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(msg2)).build())
.get();
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);

// Ack the first message.
MessageAndConsumer toAck1 = pollQueueMessageAndConsumer(receiveQueue);
toAck1.consumer().ack();

// Ack the second message.
MessageAndConsumer toAck2 = pollQueueMessageAndConsumer(receiveQueue);
toAck2.consumer().ack();

assertNotEquals(toAck1.message().getData(), toAck2.message().getData());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajanya-google Can you help me understand the purpose of this test? I see two messages of different sizes getting published using a publisher with no batching configured, so each message is essentially published in its own batch of batch size 1, each message has a size greater than the compression threshold of 10 bytes, they will get compressed, then we read these messages and confirm that their data fields aren't equal (what's the point of that)?


subscriber.stopAsync().awaitTerminated();
subscriptionAdminClient.deleteSubscription(subscriptionName);
topicAdminClient.deleteTopic(topicName);
}

private MessageAndConsumer pollQueueMessageAndConsumer(BlockingQueue<Object> queue)
throws InterruptedException {
Object obj = pollQueue(queue);
Expand Down Expand Up @@ -434,4 +494,14 @@ private Object pollQueue(BlockingQueue<Object> queue) throws InterruptedExceptio

return obj;
}

/** Generates message of given bytes by repeatedly concatenating a token. */
private String generateMessage(String token, int bytes) {
String result = "";
int tokenBytes = token.length();
for (int i = 0; i < Math.floor(bytes / tokenBytes) + 1; i++) {
rajanya-google marked this conversation as resolved.
Show resolved Hide resolved
result = result.concat(token);
}
return result;
}
}
Expand Up @@ -282,6 +282,31 @@ public void testPublishMixedSizeAndDuration() throws Exception {
shutdownTestPublisher(publisher);
}

@Test
public void testPublishWithCompression() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(100))
.build())
.setEnableCompression(true)
.setCompressionBytesThreshold(100)
.build();

testPublisherServiceImpl.addPublishResponse(
PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2"));
ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
ApiFuture<String> publishFuture2 = sendTestMessage(publisher, "B");
assertEquals("1", publishFuture1.get());
assertEquals("2", publishFuture2.get());

fakeExecutor.advanceTime(Duration.ofSeconds(100));
shutdownTestPublisher(publisher);
}

private ApiFuture<String> sendTestMessage(Publisher publisher, String data) {
return publisher.publish(
PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build());
Expand Down