Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): set finalizer for PublisherClient #7109

Merged
merged 7 commits into from Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pubsublite/doc.go
Expand Up @@ -187,7 +187,7 @@ client that may be able to handle messages.
See https://cloud.google.com/pubsub/lite/docs/subscribing for more information
about receiving messages.

gRPC Connection Pools
# gRPC Connection Pools

Pub/Sub Lite utilizes gRPC streams extensively. gRPC allows a maximum of 100
streams per connection. Internally, the library uses a default connection pool
Expand Down
27 changes: 27 additions & 0 deletions pubsublite/pscompat/integration_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"runtime"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -590,6 +591,32 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {
}
})

// Verifies that publisher clients are not stopped while still in use.
t.Run("Finalizer", func(t *testing.T) {
publisher := publisherClient(context.Background(), t, DefaultPublishSettings, topicPath)
runtime.GC() // Publisher should not be stopped

result := publisher.Publish(ctx, &pubsub.Message{Data: []byte("finalizer1")})
runtime.GC() // Publisher should not be stopped
if _, err := result.Get(ctx); err != nil {
t.Errorf("Publish() got err: %v", err)
}

result = publisher.Publish(ctx, &pubsub.Message{Data: []byte("finalizer2")})
// The finalizer runs during the next GC. Publish should still succeed
// because Stop flushes outstanding messages and waits for publish responses
// before closing connections.
runtime.GC()
if _, err := result.Get(ctx); err != nil {
tmdiep marked this conversation as resolved.
Show resolved Hide resolved
t.Errorf("Publish() got err: %v", err)
}

// Explicitly clear the publisher reference, but the finalizer should have
// already been triggered.
publisher = nil
runtime.GC()
})

// Verifies that cancelling the context passed to NewSubscriberClient can shut
// down the subscriber.
t.Run("CancelSubscriberContext", func(t *testing.T) {
Expand Down
20 changes: 19 additions & 1 deletion pubsublite/pscompat/publisher.go
Expand Up @@ -15,6 +15,7 @@ package pscompat

import (
"context"
"runtime"
"sync"

"cloud.google.com/go/pubsub"
Expand Down Expand Up @@ -73,6 +74,9 @@ type PublisherClient struct {
// NewPublisherClient creates a new Pub/Sub Lite publisher client to publish
// messages to a given topic, using DefaultPublishSettings. A valid topic path
// has the format: "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID".
//
// Stop must be called to release resources when a PublisherClient is no longer
// required.
func NewPublisherClient(ctx context.Context, topic string, opts ...option.ClientOption) (*PublisherClient, error) {
return NewPublisherClientWithSettings(ctx, topic, DefaultPublishSettings, opts...)
}
Expand All @@ -81,6 +85,9 @@ func NewPublisherClient(ctx context.Context, topic string, opts ...option.Client
// publish messages to a given topic, using the specified PublishSettings. A
// valid topic path has the format:
// "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID".
//
// Stop must be called to release resources when a PublisherClient is no longer
// required.
func NewPublisherClientWithSettings(ctx context.Context, topic string, settings PublishSettings, opts ...option.ClientOption) (*PublisherClient, error) {
topicPath, err := wire.ParseTopicPath(topic)
if err != nil {
Expand All @@ -99,7 +106,16 @@ func NewPublisherClientWithSettings(ctx context.Context, topic string, settings
if err := wirePub.WaitStarted(); err != nil {
return nil, err
}
return &PublisherClient{settings: settings, wirePub: wirePub}, nil
publisher := &PublisherClient{settings: settings, wirePub: wirePub}

// Mitigation for Stop not being called when the publisher client is no longer
// used. Users must still call Stop to promptly shut down the publisher, as
// finalizers run after an arbitrary amount of time.
runtime.SetFinalizer(publisher, func(p *PublisherClient) {
// TODO: Log a warning.
go p.wirePub.Stop()
})
return publisher, nil
}

// Publish publishes `msg` to the topic asynchronously. Messages are batched and
Expand Down Expand Up @@ -136,13 +152,15 @@ func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pub
ipubsub.SetPublishResult(result, "", err)
}
})
runtime.KeepAlive(p) // Delay finalizers up to this point
return result
}

// Stop sends all remaining published messages and closes publish streams.
// Returns once all outstanding messages have been sent or have failed to be
// sent. Stop should be called when the client is no longer required.
func (p *PublisherClient) Stop() {
runtime.SetFinalizer(p, nil)
p.wirePub.Stop()
p.wirePub.WaitStopped()
}
Expand Down