From e648bd95ff5b33383440e18245106741292ac97a Mon Sep 17 00:00:00 2001 From: tmdiep Date: Fri, 2 Dec 2022 08:02:27 +1100 Subject: [PATCH] feat(pubsublite): set finalizer for PublisherClient (#7109) Mitigates `PublisherClient.Stop` not being called when the publisher is no longer used. Users must still call `Stop` to promptly shut down the publisher, as finalizers run after an arbitrary amount of time. --- pubsublite/doc.go | 2 +- pubsublite/pscompat/integration_test.go | 27 +++++++++++++++++++++++++ pubsublite/pscompat/publisher.go | 20 +++++++++++++++++- 3 files changed, 47 insertions(+), 2 deletions(-) diff --git a/pubsublite/doc.go b/pubsublite/doc.go index 007f04e3cde..4fa83f6edac 100644 --- a/pubsublite/doc.go +++ b/pubsublite/doc.go @@ -187,7 +187,7 @@ client that may be able to handle messages. See https://cloud.google.com/pubsub/lite/docs/subscribing for more information about receiving messages. -gRPC Connection Pools +# gRPC Connection Pools Pub/Sub Lite utilizes gRPC streams extensively. gRPC allows a maximum of 100 streams per connection. Internally, the library uses a default connection pool diff --git a/pubsublite/pscompat/integration_test.go b/pubsublite/pscompat/integration_test.go index 573ad84edcc..b91ccb7decb 100644 --- a/pubsublite/pscompat/integration_test.go +++ b/pubsublite/pscompat/integration_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "runtime" "sort" "strings" "sync" @@ -590,6 +591,32 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) { } }) + // Verifies that publisher clients are not stopped while still in use. + t.Run("Finalizer", func(t *testing.T) { + publisher := publisherClient(context.Background(), t, DefaultPublishSettings, topicPath) + runtime.GC() // Publisher should not be stopped + + result := publisher.Publish(ctx, &pubsub.Message{Data: []byte("finalizer1")}) + runtime.GC() // Publisher should not be stopped + if _, err := result.Get(ctx); err != nil { + t.Errorf("Publish() got err: %v", err) + } + + result = publisher.Publish(ctx, &pubsub.Message{Data: []byte("finalizer2")}) + // The finalizer runs during the next GC. Publish should still succeed + // because Stop flushes outstanding messages and waits for publish responses + // before closing connections. + runtime.GC() + if _, err := result.Get(ctx); err != nil { + t.Errorf("Publish() got err: %v", err) + } + + // Explicitly clear the publisher reference, but the finalizer should have + // already been triggered. + publisher = nil + runtime.GC() + }) + // Verifies that cancelling the context passed to NewSubscriberClient can shut // down the subscriber. t.Run("CancelSubscriberContext", func(t *testing.T) { diff --git a/pubsublite/pscompat/publisher.go b/pubsublite/pscompat/publisher.go index 88cbc74c76c..136d45f4849 100644 --- a/pubsublite/pscompat/publisher.go +++ b/pubsublite/pscompat/publisher.go @@ -15,6 +15,7 @@ package pscompat import ( "context" + "runtime" "sync" "cloud.google.com/go/pubsub" @@ -73,6 +74,9 @@ type PublisherClient struct { // NewPublisherClient creates a new Pub/Sub Lite publisher client to publish // messages to a given topic, using DefaultPublishSettings. A valid topic path // has the format: "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID". +// +// Stop must be called to release resources when a PublisherClient is no longer +// required. func NewPublisherClient(ctx context.Context, topic string, opts ...option.ClientOption) (*PublisherClient, error) { return NewPublisherClientWithSettings(ctx, topic, DefaultPublishSettings, opts...) } @@ -81,6 +85,9 @@ func NewPublisherClient(ctx context.Context, topic string, opts ...option.Client // publish messages to a given topic, using the specified PublishSettings. A // valid topic path has the format: // "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID". +// +// Stop must be called to release resources when a PublisherClient is no longer +// required. func NewPublisherClientWithSettings(ctx context.Context, topic string, settings PublishSettings, opts ...option.ClientOption) (*PublisherClient, error) { topicPath, err := wire.ParseTopicPath(topic) if err != nil { @@ -99,7 +106,16 @@ func NewPublisherClientWithSettings(ctx context.Context, topic string, settings if err := wirePub.WaitStarted(); err != nil { return nil, err } - return &PublisherClient{settings: settings, wirePub: wirePub}, nil + publisher := &PublisherClient{settings: settings, wirePub: wirePub} + + // Mitigation for Stop not being called when the publisher client is no longer + // used. Users must still call Stop to promptly shut down the publisher, as + // finalizers run after an arbitrary amount of time. + runtime.SetFinalizer(publisher, func(p *PublisherClient) { + // TODO: Log a warning. + go p.wirePub.Stop() + }) + return publisher, nil } // Publish publishes `msg` to the topic asynchronously. Messages are batched and @@ -136,6 +152,7 @@ func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pub ipubsub.SetPublishResult(result, "", err) } }) + runtime.KeepAlive(p) // Delay finalizers up to this point return result } @@ -143,6 +160,7 @@ func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pub // Returns once all outstanding messages have been sent or have failed to be // sent. Stop should be called when the client is no longer required. func (p *PublisherClient) Stop() { + runtime.SetFinalizer(p, nil) p.wirePub.Stop() p.wirePub.WaitStopped() }