diff --git a/pubsublite/admin.go b/pubsublite/admin.go index 09964e4d88b..94666194316 100644 --- a/pubsublite/admin.go +++ b/pubsublite/admin.go @@ -37,6 +37,10 @@ var ( // regions and zones where Pub/Sub Lite is available. // // An AdminClient may be shared by multiple goroutines. +// +// Close must be called to release resources when an AdminClient is no longer +// required. If the client is available for the lifetime of the program, then +// Close need not be called at exit. type AdminClient struct { admin *vkit.AdminClient } diff --git a/pubsublite/doc.go b/pubsublite/doc.go index f9e91482c38..007f04e3cde 100644 --- a/pubsublite/doc.go +++ b/pubsublite/doc.go @@ -80,6 +80,9 @@ Messages are published to topics. Pub/Sub Lite topics may be created like so: // TODO: Handle error. } +Close must be called to release resources when an AdminClient is no longer +required. + See https://cloud.google.com/pubsub/lite/docs/topics for more information about how Pub/Sub Lite topics are configured. @@ -122,6 +125,10 @@ after it has been stopped or has terminated due to a permanent error. publisher.Stop() +PublisherClients are expected to be long-lived and used for the duration of the +application, rather than for publishing small batches of messages. Stop must be +called to release resources when a PublisherClient is no longer required. + See https://cloud.google.com/pubsub/lite/docs/publishing for more information about publishing. diff --git a/pubsublite/example_test.go b/pubsublite/example_test.go index 17c0f73746d..0e6afe6a1e5 100644 --- a/pubsublite/example_test.go +++ b/pubsublite/example_test.go @@ -34,6 +34,7 @@ func ExampleAdminClient_CreateTopic() { if err != nil { // TODO: Handle error. } + defer admin.Close() const gib = 1 << 30 topicConfig := pubsublite.TopicConfig{ @@ -58,6 +59,7 @@ func ExampleAdminClient_UpdateTopic() { if err != nil { // TODO: Handle error. } + defer admin.Close() updateConfig := pubsublite.TopicConfigToUpdate{ Name: "projects/my-project/locations/region-or-zone/topics/my-topic", @@ -79,6 +81,7 @@ func ExampleAdminClient_DeleteTopic() { if err != nil { // TODO: Handle error. } + defer admin.Close() const topic = "projects/my-project/locations/region-or-zone/topics/my-topic" if err := admin.DeleteTopic(ctx, topic); err != nil { @@ -93,6 +96,7 @@ func ExampleAdminClient_Topics() { if err != nil { // TODO: Handle error. } + defer admin.Close() // List the configs of all topics in the given region or zone for the project. it := admin.Topics(ctx, "projects/my-project/locations/region-or-zone") @@ -115,6 +119,7 @@ func ExampleAdminClient_TopicSubscriptions() { if err != nil { // TODO: Handle error. } + defer admin.Close() // List the paths of all subscriptions of a topic. const topic = "projects/my-project/locations/region-or-zone/topics/my-topic" @@ -141,6 +146,7 @@ func ExampleAdminClient_CreateSubscription() { if err != nil { // TODO: Handle error. } + defer admin.Close() subscriptionConfig := pubsublite.SubscriptionConfig{ Name: "projects/my-project/locations/region-or-zone/subscriptions/my-subscription", @@ -162,6 +168,7 @@ func ExampleAdminClient_UpdateSubscription() { if err != nil { // TODO: Handle error. } + defer admin.Close() updateConfig := pubsublite.SubscriptionConfigToUpdate{ Name: "projects/my-project/locations/region-or-zone/subscriptions/my-subscription", @@ -182,6 +189,7 @@ func ExampleAdminClient_SeekSubscription() { if err != nil { // TODO: Handle error. } + defer admin.Close() const subscription = "projects/my-project/locations/region-or-zone/subscriptions/my-subscription" seekOp, err := admin.SeekSubscription(ctx, subscription, pubsublite.Beginning) @@ -209,6 +217,7 @@ func ExampleAdminClient_DeleteSubscription() { if err != nil { // TODO: Handle error. } + defer admin.Close() const subscription = "projects/my-project/locations/region-or-zone/subscriptions/my-subscription" if err := admin.DeleteSubscription(ctx, subscription); err != nil { @@ -223,6 +232,7 @@ func ExampleAdminClient_Subscriptions() { if err != nil { // TODO: Handle error. } + defer admin.Close() // List the configs of all subscriptions in the given region or zone for the project. it := admin.Subscriptions(ctx, "projects/my-project/locations/region-or-zone") @@ -247,6 +257,7 @@ func ExampleAdminClient_CreateReservation() { if err != nil { // TODO: Handle error. } + defer admin.Close() reservationConfig := pubsublite.ReservationConfig{ Name: "projects/my-project/locations/region/reservations/my-reservation", @@ -264,6 +275,7 @@ func ExampleAdminClient_UpdateReservation() { if err != nil { // TODO: Handle error. } + defer admin.Close() updateConfig := pubsublite.ReservationConfigToUpdate{ Name: "projects/my-project/locations/region/reservations/my-reservation", @@ -281,6 +293,7 @@ func ExampleAdminClient_DeleteReservation() { if err != nil { // TODO: Handle error. } + defer admin.Close() const reservation = "projects/my-project/locations/region/reservations/my-reservation" if err := admin.DeleteReservation(ctx, reservation); err != nil { @@ -294,6 +307,7 @@ func ExampleAdminClient_Reservations() { if err != nil { // TODO: Handle error. } + defer admin.Close() // List the configs of all reservations in the given region for the project. it := admin.Reservations(ctx, "projects/my-project/locations/region") @@ -315,6 +329,7 @@ func ExampleAdminClient_ReservationTopics() { if err != nil { // TODO: Handle error. } + defer admin.Close() // List the paths of all topics using a reservation. const reservation = "projects/my-project/locations/region/reservations/my-reservation" diff --git a/pubsublite/pscompat/publisher.go b/pubsublite/pscompat/publisher.go index c0ae2584c55..88cbc74c76c 100644 --- a/pubsublite/pscompat/publisher.go +++ b/pubsublite/pscompat/publisher.go @@ -54,6 +54,11 @@ var ( // PublisherClient is a Pub/Sub Lite client to publish messages to a given // topic. A PublisherClient is safe to use from multiple goroutines. // +// PublisherClients are expected to be long-lived and used for the duration of +// the application, rather than for publishing small batches of messages. Stop +// must be called to release resources when a PublisherClient is no longer +// required. +// // See https://cloud.google.com/pubsub/lite/docs/publishing for more information // about publishing. type PublisherClient struct { diff --git a/pubsublite/pscompat/settings.go b/pubsublite/pscompat/settings.go index a4b8b51c817..1cc0c77fac4 100644 --- a/pubsublite/pscompat/settings.go +++ b/pubsublite/pscompat/settings.go @@ -74,7 +74,10 @@ type PublishSettings struct { // backends. Note that if the timeout duration is long, ErrOverflow may occur // first. // - // It is not recommended to set Timeout below 2 minutes. + // It is not recommended to set Timeout below 2 minutes. If no failover + // operations need to be performed by the application, it is recommended to + // just use the default timeout value to avoid the PublisherClient terminating + // during short periods of backend unavailability. Timeout time.Duration // The maximum number of bytes that the publisher will keep in memory before @@ -218,7 +221,10 @@ type ReceiveSettings struct { // and details of the last error that occurred while trying to reconnect to // backends. // - // It is not recommended to set Timeout below 2 minutes. + // It is not recommended to set Timeout below 2 minutes. If no failover + // operations need to be performed by the application, it is recommended to + // just use the default timeout value to avoid the SubscriberClient + // terminating during short periods of backend unavailability. Timeout time.Duration // The topic partition numbers (zero-indexed) to receive messages from. diff --git a/pubsublite/pscompat/subscriber.go b/pubsublite/pscompat/subscriber.go index 3aacf17674e..69c16ce04c0 100644 --- a/pubsublite/pscompat/subscriber.go +++ b/pubsublite/pscompat/subscriber.go @@ -329,7 +329,8 @@ func NewSubscriberClientWithSettings(ctx context.Context, subscription string, s // callback f will block the delivery of subsequent messages for the partition. // // All messages received by f must be ACKed or NACKed. Failure to do so can -// prevent Receive from returning. +// prevent Receive from returning. Messages may be processed by the client +// concurrently and ACKed asynchronously to increase throughput. // // Each SubscriberClient may have only one invocation of Receive active at a // time.