Skip to content

Commit

Permalink
Optimize pubsub receiver and add unit tests (#279)
Browse files Browse the repository at this point in the history
* Default to 1000 goroutines for pubsub receiver and add unit tests for pubsub connection

Signed-off-by: Chen Shou <cshou@google.com>

* add some test comments

Signed-off-by: Chen Shou <cshou@google.com>

* run go mod tidy

Signed-off-by: Chen Shou <cshou@google.com>

* exposed ReceiveSettings on the transport

Signed-off-by: Chen Shou <cshou@google.com>

* add ReceiveSettings to transport option

Signed-off-by: Chen Shou <cshou@google.com>
  • Loading branch information
yolocs authored and n3wscott committed Jan 14, 2020
1 parent e58ce55 commit d289f68
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 18 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Azure/go-autorest/autorest/to v0.2.0 // indirect
github.com/Azure/go-autorest/autorest/validation v0.1.0 // indirect
github.com/fortytw2/leaktest v1.3.0 // indirect
github.com/google/go-cmp v0.3.0
github.com/google/go-cmp v0.4.0
github.com/google/uuid v1.1.1
github.com/kelseyhightower/envconfig v1.4.0
github.com/nats-io/nats-server/v2 v2.1.2
Expand All @@ -20,6 +20,8 @@ require (
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0
golang.org/x/sync v0.0.0-20190423024810-112230192c58
google.golang.org/api v0.15.0
google.golang.org/grpc v1.26.0
pack.ag/amqp v0.11.0
)

Expand Down
21 changes: 21 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLM
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/census-instrumentation/opencensus-proto v0.2.0 h1:LzQXZOgg4CQfE6bFvXGM30YZL1WW/M337pXml+GrcZ4=
github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand All @@ -44,6 +46,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand All @@ -67,12 +71,16 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4 h1:hU4mGcQI4DaAYW+IbTun+2qEZVFxK0ySjQLTbS0VQKc=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE=
Expand Down Expand Up @@ -125,6 +133,8 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f h1:BVwpUVJDADN2ufcGik7W992pyps0wZ888b/y9GXcLTU=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.2.0 h1:kUZDBDTdBVBYBj5Tmh2NZLlF60mfjA27rM34b+cVwNU=
Expand Down Expand Up @@ -213,10 +223,15 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.6.0 h1:2tJEkRfnZL5g1GeBUlITh/rqT5HG3sFcoVCUUxmgJ2g=
google.golang.org/api v0.6.0/go.mod h1:btoxGiFvQNVUZQ8W08zLtrVS08CNpINPEfxXxgJL1Q4=
google.golang.org/api v0.15.0 h1:yzlyyDW/J0w8yNFJIhiAJy4kq74S+1DOLdawELNxFMA=
google.golang.org/api v0.15.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0 h1:KxkO13IPW4Lslp2bz+KHP2E3gtFlrIGNThxkZQ3g+4c=
Expand All @@ -228,11 +243,16 @@ google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRn
google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190530194941-fb225487d101 h1:wuGevabY6r+ivPNagjUXGGxF+GqgMd+dBhjsxW4q9u4=
google.golang.org/genproto v0.0.0-20190530194941-fb225487d101/go.mod h1:z3L6/3dTEVtUr6QSP8miRzeRqwQOioJ9I66odjN4I7s=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand All @@ -245,6 +265,7 @@ honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
pack.ag/amqp v0.11.0 h1:ot/IA0enDkt4/c8xfbCO7AZzjM4bHys/UffnFmnHUnU=
pack.ag/amqp v0.11.0/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
53 changes: 37 additions & 16 deletions pkg/cloudevents/transport/pubsub/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type Connection struct {
subWasCreated bool
subOnce sync.Once

// ReceiveSettings is used to configure Pubsub pull subscription.
ReceiveSettings *pubsub.ReceiveSettings

// AckDeadline is Pub/Sub AckDeadline.
// Default is 30 seconds.
AckDeadline *time.Duration
Expand All @@ -49,6 +52,17 @@ const (
DefaultRetentionDuration = 25 * time.Hour
)

var DefaultReceiveSettings = pubsub.ReceiveSettings{
// Pubsub default receive settings will fill in other values.
// https://godoc.org/cloud.google.com/go/pubsub#Client.Subscription

// Override the default number of goroutines.
// This is a magical number now. This has shown throughput improvements empirically
// by at least 10x (compared to the default value).
NumGoroutines: 1000,
Synchronous: false,
}

func (c *Connection) getOrCreateTopic(ctx context.Context) (*pubsub.Topic, error) {
var err error
c.topicOnce.Do(func() {
Expand Down Expand Up @@ -82,15 +96,16 @@ func (c *Connection) getOrCreateTopic(ctx context.Context) (*pubsub.Topic, error

// DeleteTopic
func (c *Connection) DeleteTopic(ctx context.Context) error {
if c.topicWasCreated {
if err := c.topic.Delete(ctx); err != nil {
return err
}
c.topic = nil
c.topicWasCreated = false
c.topicOnce = sync.Once{}
if !c.topicWasCreated {
return errors.New("topic was not created by pubsub transport")
}
return errors.New("topic was not created by pubsub transport")
if err := c.topic.Delete(ctx); err != nil {
return err
}
c.topic = nil
c.topicWasCreated = false
c.topicOnce = sync.Once{}
return nil
}

func (c *Connection) getOrCreateSubscription(ctx context.Context) (*pubsub.Subscription, error) {
Expand Down Expand Up @@ -138,6 +153,11 @@ func (c *Connection) getOrCreateSubscription(ctx context.Context) (*pubsub.Subsc
_ = c.Client.Close()
return
}
if c.ReceiveSettings == nil {
sub.ReceiveSettings = DefaultReceiveSettings
} else {
sub.ReceiveSettings = *c.ReceiveSettings
}
c.subWasCreated = true
}
// Success.
Expand All @@ -151,15 +171,16 @@ func (c *Connection) getOrCreateSubscription(ctx context.Context) (*pubsub.Subsc

// DeleteSubscription
func (c *Connection) DeleteSubscription(ctx context.Context) error {
if c.subWasCreated {
if err := c.sub.Delete(ctx); err != nil {
return err
}
c.sub = nil
c.subWasCreated = false
c.subOnce = sync.Once{}
if !c.subWasCreated {
return errors.New("subscription was not created by pubsub transport")
}
if err := c.sub.Delete(ctx); err != nil {
return err
}
return errors.New("subscription was not created by pubsub transport")
c.sub = nil
c.subWasCreated = false
c.subOnce = sync.Once{}
return nil
}

// Publish
Expand Down
195 changes: 195 additions & 0 deletions pkg/cloudevents/transport/pubsub/internal/connection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package internal

import (
"context"
"fmt"
"sync"
"testing"
"time"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/pstest"
"github.com/google/go-cmp/cmp"
"google.golang.org/api/option"
"google.golang.org/grpc"
)

type testPubsubClient struct {
srv *pstest.Server
conn *grpc.ClientConn
}

func (pc *testPubsubClient) New(ctx context.Context, projectID string) (*pubsub.Client, error) {
pc.srv = pstest.NewServer()
conn, err := grpc.Dial(pc.srv.Addr, grpc.WithInsecure())
if err != nil {
return nil, err
}
pc.conn = conn
return pubsub.NewClient(ctx, projectID, option.WithGRPCConn(conn))
}

func (pc *testPubsubClient) Close() {
pc.srv.Close()
pc.conn.Close()
}

func TestPublishCreateTopic(t *testing.T) {
ctx := context.Background()
pc := &testPubsubClient{}
defer pc.Close()

projectID, topicID, subID := "test-project", "test-topic", "test-sub"
client, err := pc.New(ctx, projectID)
if err != nil {
t.Fatalf("failed to create pubsub client: %v", err)
}
defer client.Close()

psconn := &Connection{
AllowCreateSubscription: true,
AllowCreateTopic: true,
Client: client,
ProjectID: projectID,
TopicID: topicID,
SubscriptionID: subID,
}

msg := &pubsub.Message{
ID: "msg-id-1",
Data: []byte("msg-data-1"),
}
if _, err := psconn.Publish(ctx, msg); err != nil {
t.Errorf("failed to publish message: %v", err)
}

if ok, err := client.Topic(topicID).Exists(ctx); err != nil || !ok {
t.Errorf("topic id=%s got exists=%v want=true, err=%v", topicID, ok, err)
}

if err := psconn.DeleteTopic(ctx); err != nil {
t.Errorf("delete topic failed: %v", err)
}

if ok, err := client.Topic(topicID).Exists(ctx); err != nil || ok {
t.Errorf("topic id=%s got exists=%v want=false, err=%v", topicID, ok, err)
}
}

func TestReceiveCreateTopicAndSubscription(t *testing.T) {
ctx := context.Background()
pc := &testPubsubClient{}
defer pc.Close()

projectID, topicID, subID := "test-project", "test-topic", "test-sub"
client, err := pc.New(ctx, projectID)
if err != nil {
t.Fatalf("failed to create pubsub client: %v", err)
}
defer client.Close()

psconn := &Connection{
AllowCreateSubscription: true,
AllowCreateTopic: true,
Client: client,
ProjectID: projectID,
TopicID: topicID,
SubscriptionID: subID,
}

ctx2, cancel := context.WithCancel(ctx)
go psconn.Receive(ctx2, func(_ context.Context, msg *pubsub.Message) {
msg.Ack()
})
// Sleep one sec for the goroutine to create the topic and subscription.
time.Sleep(time.Second)

if ok, err := client.Topic(topicID).Exists(ctx); err != nil || !ok {
t.Errorf("topic id=%s got exists=%v want=true, err=%v", topicID, ok, err)
}

if ok, err := client.Subscription(subID).Exists(ctx); err != nil || !ok {
t.Errorf("subscription id=%s got exists=%v want=true, err=%v", subID, ok, err)
}

if psconn.sub.ReceiveSettings.NumGoroutines != DefaultReceiveSettings.NumGoroutines {
t.Errorf("subscription receive settings have NumGoroutines=%d, want %d",
psconn.sub.ReceiveSettings.NumGoroutines, DefaultReceiveSettings.NumGoroutines)
}

cancel()

if err := psconn.DeleteSubscription(ctx); err != nil {
t.Errorf("delete subscription failed: %v", err)
}

if ok, err := client.Subscription(subID).Exists(ctx); err != nil || ok {
t.Errorf("subscription id=%s got exists=%v want=false, err=%v", subID, ok, err)
}

if err := psconn.DeleteTopic(ctx); err != nil {
t.Errorf("delete topic failed: %v", err)
}

if ok, err := client.Topic(topicID).Exists(ctx); err != nil || ok {
t.Errorf("topic id=%s got exists=%v want=false, err=%v", topicID, ok, err)
}
}

func TestPublishReceiveRoundtrip(t *testing.T) {
ctx := context.Background()
pc := &testPubsubClient{}
defer pc.Close()

projectID, topicID, subID := "test-project", "test-topic", "test-sub"
client, err := pc.New(ctx, projectID)
if err != nil {
t.Fatalf("failed to create pubsub client: %v", err)
}
defer client.Close()

psconn := &Connection{
AllowCreateSubscription: true,
AllowCreateTopic: true,
Client: client,
ProjectID: projectID,
TopicID: topicID,
SubscriptionID: subID,
}

wantMsgs := make(map[string]string)
gotMsgs := make(map[string]string)
wg := &sync.WaitGroup{}

ctx2, cancel := context.WithCancel(ctx)
mux := &sync.Mutex{}
// Pubsub will drop all messages if there is no subscription.
// Call Receive first so that subscription can be created before
// we publish any message.
go psconn.Receive(ctx2, func(_ context.Context, msg *pubsub.Message) {
mux.Lock()
defer mux.Unlock()
gotMsgs[string(msg.Data)] = string(msg.Data)
msg.Ack()
wg.Done()
})
// Wait a little bit for the subscription creation to complete.
time.Sleep(time.Second)

for i := 0; i < 10; i++ {
data := fmt.Sprintf("data-%d", i)
wantMsgs[data] = data

if _, err := psconn.Publish(ctx, &pubsub.Message{Data: []byte(data)}); err != nil {
t.Errorf("failed to publish message: %v", err)
}
wg.Add(1)
}

wg.Wait()
cancel()

if diff := cmp.Diff(gotMsgs, wantMsgs); diff != "" {
t.Errorf("received unexpected messages (-want +got):\n%s", diff)
}
}

0 comments on commit d289f68

Please sign in to comment.