Skip to content

Commit

Permalink
docs(pubsublite): update publisher and subscriber client usage (#6864)
Browse files Browse the repository at this point in the history
Added a few notes:

* Publisher clients are meant to be long-lived and should be stopped when no longer used.
* Messages can be acknowledged asynchronously.
* Recommend default timeout values.
  • Loading branch information
tmdiep committed Oct 17, 2022
1 parent 2e87b4e commit f9eb454
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 3 deletions.
4 changes: 4 additions & 0 deletions pubsublite/admin.go
Expand Up @@ -37,6 +37,10 @@ var (
// regions and zones where Pub/Sub Lite is available.
//
// An AdminClient may be shared by multiple goroutines.
//
// Close must be called to release resources when an AdminClient is no longer
// required. If the client is available for the lifetime of the program, then
// Close need not be called at exit.
type AdminClient struct {
admin *vkit.AdminClient
}
Expand Down
7 changes: 7 additions & 0 deletions pubsublite/doc.go
Expand Up @@ -80,6 +80,9 @@ Messages are published to topics. Pub/Sub Lite topics may be created like so:
// TODO: Handle error.
}
Close must be called to release resources when an AdminClient is no longer
required.
See https://cloud.google.com/pubsub/lite/docs/topics for more information about
how Pub/Sub Lite topics are configured.
Expand Down Expand Up @@ -122,6 +125,10 @@ after it has been stopped or has terminated due to a permanent error.
publisher.Stop()
PublisherClients are expected to be long-lived and used for the duration of the
application, rather than for publishing small batches of messages. Stop must be
called to release resources when a PublisherClient is no longer required.
See https://cloud.google.com/pubsub/lite/docs/publishing for more information
about publishing.
Expand Down
15 changes: 15 additions & 0 deletions pubsublite/example_test.go
Expand Up @@ -34,6 +34,7 @@ func ExampleAdminClient_CreateTopic() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

const gib = 1 << 30
topicConfig := pubsublite.TopicConfig{
Expand All @@ -58,6 +59,7 @@ func ExampleAdminClient_UpdateTopic() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

updateConfig := pubsublite.TopicConfigToUpdate{
Name: "projects/my-project/locations/region-or-zone/topics/my-topic",
Expand All @@ -79,6 +81,7 @@ func ExampleAdminClient_DeleteTopic() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

const topic = "projects/my-project/locations/region-or-zone/topics/my-topic"
if err := admin.DeleteTopic(ctx, topic); err != nil {
Expand All @@ -93,6 +96,7 @@ func ExampleAdminClient_Topics() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

// List the configs of all topics in the given region or zone for the project.
it := admin.Topics(ctx, "projects/my-project/locations/region-or-zone")
Expand All @@ -115,6 +119,7 @@ func ExampleAdminClient_TopicSubscriptions() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

// List the paths of all subscriptions of a topic.
const topic = "projects/my-project/locations/region-or-zone/topics/my-topic"
Expand All @@ -141,6 +146,7 @@ func ExampleAdminClient_CreateSubscription() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

subscriptionConfig := pubsublite.SubscriptionConfig{
Name: "projects/my-project/locations/region-or-zone/subscriptions/my-subscription",
Expand All @@ -162,6 +168,7 @@ func ExampleAdminClient_UpdateSubscription() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

updateConfig := pubsublite.SubscriptionConfigToUpdate{
Name: "projects/my-project/locations/region-or-zone/subscriptions/my-subscription",
Expand All @@ -182,6 +189,7 @@ func ExampleAdminClient_SeekSubscription() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

const subscription = "projects/my-project/locations/region-or-zone/subscriptions/my-subscription"
seekOp, err := admin.SeekSubscription(ctx, subscription, pubsublite.Beginning)
Expand Down Expand Up @@ -209,6 +217,7 @@ func ExampleAdminClient_DeleteSubscription() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

const subscription = "projects/my-project/locations/region-or-zone/subscriptions/my-subscription"
if err := admin.DeleteSubscription(ctx, subscription); err != nil {
Expand All @@ -223,6 +232,7 @@ func ExampleAdminClient_Subscriptions() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

// List the configs of all subscriptions in the given region or zone for the project.
it := admin.Subscriptions(ctx, "projects/my-project/locations/region-or-zone")
Expand All @@ -247,6 +257,7 @@ func ExampleAdminClient_CreateReservation() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

reservationConfig := pubsublite.ReservationConfig{
Name: "projects/my-project/locations/region/reservations/my-reservation",
Expand All @@ -264,6 +275,7 @@ func ExampleAdminClient_UpdateReservation() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

updateConfig := pubsublite.ReservationConfigToUpdate{
Name: "projects/my-project/locations/region/reservations/my-reservation",
Expand All @@ -281,6 +293,7 @@ func ExampleAdminClient_DeleteReservation() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

const reservation = "projects/my-project/locations/region/reservations/my-reservation"
if err := admin.DeleteReservation(ctx, reservation); err != nil {
Expand All @@ -294,6 +307,7 @@ func ExampleAdminClient_Reservations() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

// List the configs of all reservations in the given region for the project.
it := admin.Reservations(ctx, "projects/my-project/locations/region")
Expand All @@ -315,6 +329,7 @@ func ExampleAdminClient_ReservationTopics() {
if err != nil {
// TODO: Handle error.
}
defer admin.Close()

// List the paths of all topics using a reservation.
const reservation = "projects/my-project/locations/region/reservations/my-reservation"
Expand Down
5 changes: 5 additions & 0 deletions pubsublite/pscompat/publisher.go
Expand Up @@ -54,6 +54,11 @@ var (
// PublisherClient is a Pub/Sub Lite client to publish messages to a given
// topic. A PublisherClient is safe to use from multiple goroutines.
//
// PublisherClients are expected to be long-lived and used for the duration of
// the application, rather than for publishing small batches of messages. Stop
// must be called to release resources when a PublisherClient is no longer
// required.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
type PublisherClient struct {
Expand Down
10 changes: 8 additions & 2 deletions pubsublite/pscompat/settings.go
Expand Up @@ -74,7 +74,10 @@ type PublishSettings struct {
// backends. Note that if the timeout duration is long, ErrOverflow may occur
// first.
//
// It is not recommended to set Timeout below 2 minutes.
// It is not recommended to set Timeout below 2 minutes. If no failover
// operations need to be performed by the application, it is recommended to
// just use the default timeout value to avoid the PublisherClient terminating
// during short periods of backend unavailability.
Timeout time.Duration

// The maximum number of bytes that the publisher will keep in memory before
Expand Down Expand Up @@ -218,7 +221,10 @@ type ReceiveSettings struct {
// and details of the last error that occurred while trying to reconnect to
// backends.
//
// It is not recommended to set Timeout below 2 minutes.
// It is not recommended to set Timeout below 2 minutes. If no failover
// operations need to be performed by the application, it is recommended to
// just use the default timeout value to avoid the SubscriberClient
// terminating during short periods of backend unavailability.
Timeout time.Duration

// The topic partition numbers (zero-indexed) to receive messages from.
Expand Down
3 changes: 2 additions & 1 deletion pubsublite/pscompat/subscriber.go
Expand Up @@ -329,7 +329,8 @@ func NewSubscriberClientWithSettings(ctx context.Context, subscription string, s
// callback f will block the delivery of subsequent messages for the partition.
//
// All messages received by f must be ACKed or NACKed. Failure to do so can
// prevent Receive from returning.
// prevent Receive from returning. Messages may be processed by the client
// concurrently and ACKed asynchronously to increase throughput.
//
// Each SubscriberClient may have only one invocation of Receive active at a
// time.
Expand Down

0 comments on commit f9eb454

Please sign in to comment.