Skip to content

Commit

Permalink
feat(pubsub): support publisher compression (googleapis#9711)
Browse files Browse the repository at this point in the history
* feat(pubsub): support publisher compression

* address review comments
  • Loading branch information
hongalex authored and ma-g-22 committed Apr 29, 2024
1 parent eef9673 commit 9e400b9
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 2 deletions.
26 changes: 26 additions & 0 deletions pubsub/integration_test.go
Expand Up @@ -2177,6 +2177,32 @@ func TestIntegration_DetectProjectID(t *testing.T) {
}
}

func TestIntegration_PublishCompression(t *testing.T) {
ctx := context.Background()
client := integrationTestClient(ctx, t)
defer client.Close()

topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()

topic.PublishSettings.EnableCompression = true
topic.PublishSettings.CompressionBytesThreshold = 50

const messageSizeBytes = 1000

msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))}
res := topic.Publish(ctx, msg)

_, err = res.Get(ctx)
if err != nil {
t.Errorf("publish result got err: %v", err)
}
}

// createTopicWithRetry creates a topic, wrapped with testutil.Retry and returns the created topic or an error.
func createTopicWithRetry(ctx context.Context, t *testing.T, c *Client, topicID string, cfg *TopicConfig) (*Topic, error) {
var topic *Topic
Expand Down
28 changes: 26 additions & 2 deletions pubsub/topic.go
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/api/support/bundler"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -117,6 +118,17 @@ type PublishSettings struct {

// FlowControlSettings defines publisher flow control settings.
FlowControlSettings FlowControlSettings

// EnableCompression enables transport compression for Publish operations
EnableCompression bool

// CompressionBytesThreshold defines the threshold (in bytes) above which messages
// are compressed for transport. Only takes effect if EnableCompression is true.
CompressionBytesThreshold int
}

func (ps *PublishSettings) shouldCompress(batchSize int) bool {
return ps.EnableCompression && batchSize > ps.CompressionBytesThreshold
}

// DefaultPublishSettings holds the default values for topics' PublishSettings.
Expand All @@ -134,6 +146,10 @@ var DefaultPublishSettings = PublishSettings{
MaxOutstandingBytes: -1,
LimitExceededBehavior: FlowControlIgnore,
},
// Publisher compression defaults matches Java's defaults
// https://github.com/googleapis/java-pubsub/blob/7d33e7891db1b2e32fd523d7655b6c11ea140a8b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L717-L718
EnableCompression: false,
CompressionBytesThreshold: 240,
}

// CreateTopic creates a new topic.
Expand Down Expand Up @@ -875,13 +891,15 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
}
pbMsgs := make([]*pb.PubsubMessage, len(bms))
var orderingKey string
batchSize := 0
for i, bm := range bms {
orderingKey = bm.msg.OrderingKey
pbMsgs[i] = &pb.PubsubMessage{
Data: bm.msg.Data,
Attributes: bm.msg.Attributes,
OrderingKey: bm.msg.OrderingKey,
}
batchSize = batchSize + proto.Size(pbMsgs[i])
bm.msg = nil // release bm.msg for GC
}
var res *pb.PublishResponse
Expand All @@ -897,11 +915,17 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
opt.Resolve(&settings)
}
r := &publishRetryer{defaultRetryer: settings.Retry()}
gaxOpts := []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
gax.WithRetry(func() gax.Retryer { return r }),
}
if t.PublishSettings.shouldCompress(batchSize) {
gaxOpts = append(gaxOpts, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
}
res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{
Topic: t.name,
Messages: pbMsgs,
}, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
gax.WithRetry(func() gax.Retryer { return r }))
}, gaxOpts...)
}
end := time.Now()
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions pubsub/topic_test.go
Expand Up @@ -747,3 +747,26 @@ func TestPublishOrderingNotEnabled(t *testing.T) {
t.Errorf("got %v, want errTopicOrderingNotEnabled", err)
}
}

func TestPublishCompression(t *testing.T) {
ctx := context.Background()
client, srv := newFake(t)
defer client.Close()
defer srv.Close()

topic := mustCreateTopic(t, client, "topic-compression")
defer topic.Stop()

topic.PublishSettings.EnableCompression = true
topic.PublishSettings.CompressionBytesThreshold = 50

const messageSizeBytes = 1000

msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))}
res := topic.Publish(ctx, msg)

_, err := res.Get(ctx)
if err != nil {
t.Errorf("publish result got err: %v", err)
}
}

0 comments on commit 9e400b9

Please sign in to comment.