Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Apr 26, 2024
1 parent ed00367 commit c107b6e
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ type PublishSettings struct {
CompressionBytesThreshold int
}

func (ps *PublishSettings) shouldCompress(batchSize int) bool {
return ps.EnableCompression && batchSize > ps.CompressionBytesThreshold
}

// DefaultPublishSettings holds the default values for topics' PublishSettings.
var DefaultPublishSettings = PublishSettings{
DelayThreshold: 10 * time.Millisecond,
Expand All @@ -142,6 +146,8 @@ var DefaultPublishSettings = PublishSettings{
MaxOutstandingBytes: -1,
LimitExceededBehavior: FlowControlIgnore,
},
// Publisher compression defaults matches Java's defaults
// https://github.com/googleapis/java-pubsub/blob/7d33e7891db1b2e32fd523d7655b6c11ea140a8b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L717-L718
EnableCompression: false,
CompressionBytesThreshold: 240,
}
Expand Down Expand Up @@ -909,21 +915,17 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
opt.Resolve(&settings)
}
r := &publishRetryer{defaultRetryer: settings.Retry()}
if t.PublishSettings.EnableCompression && batchSize > t.PublishSettings.CompressionBytesThreshold {
res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{
Topic: t.name,
Messages: pbMsgs,
}, gax.WithGRPCOptions(
grpc.UseCompressor(gzip.Name),
grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
gax.WithRetry(func() gax.Retryer { return r }))
} else {
res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{
Topic: t.name,
Messages: pbMsgs,
}, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
gax.WithRetry(func() gax.Retryer { return r }))
gaxOpts := []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
gax.WithRetry(func() gax.Retryer { return r }),
}
if t.PublishSettings.shouldCompress(batchSize) {
gaxOpts = append(gaxOpts, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
}
res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{
Topic: t.name,
Messages: pbMsgs,
}, gaxOpts...)
}
end := time.Now()
if err != nil {
Expand Down

0 comments on commit c107b6e

Please sign in to comment.