Skip to content

Commit

Permalink
Adding parameter compressionBytesThreshold to Publisher; Adding loggi…
Browse files Browse the repository at this point in the history
…ng support in the compression example
  • Loading branch information
rajanya-google committed Feb 16, 2022
1 parent 08c5d80 commit b59e331
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ public class Publisher implements PublisherInterface {
private MessageFlowController flowController = null;

private final boolean enableCompression;

/** The message is compressed when its size (in bytes) is above the threshold. */
private static final long MSG_COMPRESSION_THRESHOLD_BYTES = 1000L; // 1 kilobyte
private final long compressionBytesThreshold;

/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
Expand Down Expand Up @@ -148,6 +146,7 @@ private Publisher(Builder builder) throws IOException {
this.enableMessageOrdering = builder.enableMessageOrdering;
this.messageTransform = builder.messageTransform;
this.enableCompression = builder.enableCompression;
this.compressionBytesThreshold = builder.compressionBytesThreshold;

messagesBatches = new HashMap<>();
messagesBatchLock = new ReentrantLock();
Expand Down Expand Up @@ -445,7 +444,7 @@ private void publishAllWithoutInflightForKey(final String orderingKey) {
}

private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
if (enableCompression && outstandingBatch.batchSizeBytes >= MSG_COMPRESSION_THRESHOLD_BYTES) {
if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) {
GrpcCallContext context = GrpcCallContext.createDefault();
context = context.withCallOptions(CallOptions.DEFAULT.withCompression("gzip"));
return publisherStub
Expand Down Expand Up @@ -682,6 +681,7 @@ public static final class Builder {
// Meaningful defaults.
static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L;
static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 1000L; // 1 kB
static final long DEFAULT_COMPRESSION_BYTES_THRESHOLD = 500L;
static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(1);
private static final Duration DEFAULT_INITIAL_RPC_TIMEOUT = Duration.ofSeconds(5);
private static final Duration DEFAULT_MAX_RPC_TIMEOUT = Duration.ofSeconds(60);
Expand Down Expand Up @@ -746,6 +746,7 @@ public PubsubMessage apply(PubsubMessage input) {
};

private boolean enableCompression = DEFAULT_ENABLE_COMPRESSION;
private long compressionBytesThreshold = DEFAULT_COMPRESSION_BYTES_THRESHOLD;

private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
Expand Down Expand Up @@ -864,6 +865,12 @@ public Builder setEnableCompression(boolean enableCompression) {
return this;
}

/** Gives the ability to set the threshold in bytes above which gRPC compression happens. */
public Builder setCompressionBytesThreshold(long compressionBytesThreshold) {
this.compressionBytesThreshold = compressionBytesThreshold;
return this;
}

/** Returns the default BatchingSettings used by the client if settings are not provided. */
public static BatchingSettings getDefaultBatchingSettings() {
return DEFAULT_BATCHING_SETTINGS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,24 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.LogManager;

public class PublishWithCompressionExample {

public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Choose an existing topic.
String topicId = "your-topic-id";
boolean allowLogging = false; // Set to true to get the stdout logs

if (allowLogging) {
setUpLogs();
}
publishWithCompressionExample(projectId, topicId);
}

Expand All @@ -42,12 +49,11 @@ public static void publishWithCompressionExample(String projectId, String topicI

Publisher publisher = null;
try {
// Create a publisher instance with default settings bound to the topic
// Create a publisher instance bound to the topic with compression enabled and other default
// settings
publisher = Publisher.newBuilder(topicName).setEnableCompression(true).build();

// Compression works only for messages of size >= 1KB
String message = generateMessage("Hello!", 2000);
ByteString data = ByteString.copyFromUtf8(message);
ByteString data = generateData("Hello!", 2000);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

// Once published, returns a server-assigned message id (unique within the topic)
Expand All @@ -63,14 +69,38 @@ public static void publishWithCompressionExample(String projectId, String topicI
}
}

/** Generate message of given bytes by repeatedly concatenating a token.* */
private static String generateMessage(String token, int bytes) {
String result = "";
// Generates data of given bytes by repeatedly concatenating a token.
// TODO(developer): Replace this method with your own data generation logic
private static ByteString generateData(String token, int bytes) {
String message = "";
int tokenBytes = token.length();
for (int i = 0; i < Math.floor(bytes / tokenBytes) + 1; i++) {
result = result.concat(token);
message = message.concat(token);
}
return result;
return ByteString.copyFromUtf8(message);
}

private static void setUpLogs() throws IOException {
String handlers = "handlers = java.util.logging.ConsoleHandler";
String handlerLevelProp = "java.util.logging.ConsoleHandler.level = ALL";
String fineProp = ".level = FINE";
String handlerFormatterProp =
"java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter";
String format = "java.util.logging.SimpleFormatter.format=[%1$tF %1$tT] %4$-5s %5$s %n";

LogManager.getLogManager()
.readConfiguration(
new ByteArrayInputStream(
(handlers
+ "\n"
+ handlerLevelProp
+ "\n"
+ fineProp
+ "\n"
+ handlerFormatterProp
+ "\n"
+ format)
.getBytes(StandardCharsets.UTF_8)));
}
}
// [END pubsub_publish_with_compression]

0 comments on commit b59e331

Please sign in to comment.