Skip to content

Commit

Permalink
feat(pubsublite): set finalizer for PublisherClient (#7109)
Browse files Browse the repository at this point in the history
Mitigates `PublisherClient.Stop` not being called when the publisher is no longer used. Users must still call `Stop` to promptly shut down the publisher, as finalizers run after an arbitrary amount of time.
  • Loading branch information
tmdiep committed Dec 1, 2022
1 parent ce0c944 commit e648bd9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pubsublite/doc.go
Expand Up @@ -187,7 +187,7 @@ client that may be able to handle messages.
See https://cloud.google.com/pubsub/lite/docs/subscribing for more information
about receiving messages.
gRPC Connection Pools
# gRPC Connection Pools
Pub/Sub Lite utilizes gRPC streams extensively. gRPC allows a maximum of 100
streams per connection. Internally, the library uses a default connection pool
Expand Down
27 changes: 27 additions & 0 deletions pubsublite/pscompat/integration_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"runtime"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -590,6 +591,32 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {
}
})

// Verifies that publisher clients are not stopped while still in use.
t.Run("Finalizer", func(t *testing.T) {
publisher := publisherClient(context.Background(), t, DefaultPublishSettings, topicPath)
runtime.GC() // Publisher should not be stopped

result := publisher.Publish(ctx, &pubsub.Message{Data: []byte("finalizer1")})
runtime.GC() // Publisher should not be stopped
if _, err := result.Get(ctx); err != nil {
t.Errorf("Publish() got err: %v", err)
}

result = publisher.Publish(ctx, &pubsub.Message{Data: []byte("finalizer2")})
// The finalizer runs during the next GC. Publish should still succeed
// because Stop flushes outstanding messages and waits for publish responses
// before closing connections.
runtime.GC()
if _, err := result.Get(ctx); err != nil {
t.Errorf("Publish() got err: %v", err)
}

// Explicitly clear the publisher reference, but the finalizer should have
// already been triggered.
publisher = nil
runtime.GC()
})

// Verifies that cancelling the context passed to NewSubscriberClient can shut
// down the subscriber.
t.Run("CancelSubscriberContext", func(t *testing.T) {
Expand Down
20 changes: 19 additions & 1 deletion pubsublite/pscompat/publisher.go
Expand Up @@ -15,6 +15,7 @@ package pscompat

import (
"context"
"runtime"
"sync"

"cloud.google.com/go/pubsub"
Expand Down Expand Up @@ -73,6 +74,9 @@ type PublisherClient struct {
// NewPublisherClient creates a new Pub/Sub Lite publisher client to publish
// messages to a given topic, using DefaultPublishSettings. A valid topic path
// has the format: "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID".
//
// Stop must be called to release resources when a PublisherClient is no longer
// required.
func NewPublisherClient(ctx context.Context, topic string, opts ...option.ClientOption) (*PublisherClient, error) {
return NewPublisherClientWithSettings(ctx, topic, DefaultPublishSettings, opts...)
}
Expand All @@ -81,6 +85,9 @@ func NewPublisherClient(ctx context.Context, topic string, opts ...option.Client
// publish messages to a given topic, using the specified PublishSettings. A
// valid topic path has the format:
// "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID".
//
// Stop must be called to release resources when a PublisherClient is no longer
// required.
func NewPublisherClientWithSettings(ctx context.Context, topic string, settings PublishSettings, opts ...option.ClientOption) (*PublisherClient, error) {
topicPath, err := wire.ParseTopicPath(topic)
if err != nil {
Expand All @@ -99,7 +106,16 @@ func NewPublisherClientWithSettings(ctx context.Context, topic string, settings
if err := wirePub.WaitStarted(); err != nil {
return nil, err
}
return &PublisherClient{settings: settings, wirePub: wirePub}, nil
publisher := &PublisherClient{settings: settings, wirePub: wirePub}

// Mitigation for Stop not being called when the publisher client is no longer
// used. Users must still call Stop to promptly shut down the publisher, as
// finalizers run after an arbitrary amount of time.
runtime.SetFinalizer(publisher, func(p *PublisherClient) {
// TODO: Log a warning.
go p.wirePub.Stop()
})
return publisher, nil
}

// Publish publishes `msg` to the topic asynchronously. Messages are batched and
Expand Down Expand Up @@ -136,13 +152,15 @@ func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pub
ipubsub.SetPublishResult(result, "", err)
}
})
runtime.KeepAlive(p) // Delay finalizers up to this point
return result
}

// Stop sends all remaining published messages and closes publish streams.
// Returns once all outstanding messages have been sent or have failed to be
// sent. Stop should be called when the client is no longer required.
func (p *PublisherClient) Stop() {
runtime.SetFinalizer(p, nil)
p.wirePub.Stop()
p.wirePub.WaitStopped()
}
Expand Down

0 comments on commit e648bd9

Please sign in to comment.