From 7ccdf2087fbacd13cfbabf6f5b513cc2daec1ff3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 16 Mar 2022 14:16:23 -0700 Subject: [PATCH] Allow JetStream Publish retries iff ErrNoResponders was returned. This is to avoid small blips from leader changes surfacing to the end application. Signed-off-by: Derek Collison --- js.go | 59 ++++++++++++++++++++++++++++++--- test/js_test.go | 88 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 4 deletions(-) diff --git a/js.go b/js.go index 9f36d909c..d526978b2 100644 --- a/js.go +++ b/js.go @@ -181,6 +181,12 @@ const ( // routine. Without this, the subscription would possibly stall until // a new message or heartbeat/fc are received. chanSubFCCheckInterval = 250 * time.Millisecond + + // Default time wait between retries on Publish iff err is NoResponders. + DefaultPubRetryWait = 250 * time.Millisecond + + // Default number of retries + DefaultPubRetryAttempts = 2 ) // Types of control messages, so far heartbeat and flow control @@ -340,6 +346,11 @@ type pubOpts struct { str string // Expected stream name seq uint64 // Expected last sequence lss uint64 // Expected last sequence per subject + + // Publish retries for NoResponders err. + rwait time.Duration // Retry wait between attempts + rnum int // Retry attempts + } // pubAckResponse is the ack response from the JetStream API when publishing a message. @@ -377,7 +388,7 @@ const ( // PublishMsg publishes a Msg to a stream from JetStream. func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { - var o pubOpts + var o = pubOpts{rwait: DefaultPubRetryWait, rnum: DefaultPubRetryAttempts} if len(opts) > 0 { if m.Header == nil { m.Header = Header{} @@ -422,11 +433,35 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { } if err != nil { - if err == ErrNoResponders { - err = ErrNoStreamResponse + for r, ttl := 0, o.ttl; err == ErrNoResponders && (r < o.rnum || o.rnum < 0); r++ { + // To protect against small blips in leadership changes etc, if we get a no responders here retry. + if o.ctx != nil { + select { + case <-o.ctx.Done(): + case <-time.After(o.rwait): + } + } else { + time.Sleep(o.rwait) + } + if o.ttl > 0 { + ttl -= o.rwait + if ttl <= 0 { + err = ErrTimeout + break + } + resp, err = js.nc.RequestMsg(m, time.Duration(ttl)) + } else { + resp, err = js.nc.RequestMsgWithContext(o.ctx, m) + } + } + if err != nil { + if err == ErrNoResponders { + err = ErrNoStreamResponse + } + return nil, err } - return nil, err } + var pa pubAckResponse if err := json.Unmarshal(resp.Data, &pa); err != nil { return nil, ErrInvalidJSAck @@ -803,6 +838,22 @@ func ExpectLastMsgId(id string) PubOpt { }) } +// RetryWait sets the retry wait time when ErrNoResponders is encountered. +func RetryWait(dur time.Duration) PubOpt { + return pubOptFn(func(opts *pubOpts) error { + opts.rwait = dur + return nil + }) +} + +// RetryAttempts sets the retry number of attemopts when ErrNoResponders is encountered. +func RetryAttempts(num int) PubOpt { + return pubOptFn(func(opts *pubOpts) error { + opts.rnum = num + return nil + }) +} + type ackOpts struct { ttl time.Duration ctx context.Context diff --git a/test/js_test.go b/test/js_test.go index edbc23943..a433b0dd2 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io/ioutil" + mrand "math/rand" "net" "os" "reflect" @@ -6266,3 +6267,90 @@ func TestJetStreamSubscribeContextCancel(t *testing.T) { }) }) } + +func TestJetStreamClusterStreamLeaderChangeClientErr(t *testing.T) { + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + } + + withJSClusterAndStream(t, "R3S", 3, cfg, func(t *testing.T, stream string, servers ...*jsServer) { + // We want to make sure the worse thing seen by the lower levels during a leadership change is NoResponders. + // We will have three concurrent contexts going on. + // 1. Leadership Changes every 500ms. + // 2. Publishing messages to the stream every 10ms. + // 3. StreamInfo calls every 15ms. + expires := time.Now().Add(5 * time.Second) + var wg sync.WaitGroup + wg.Add(3) + + randServer := func() *server.Server { + return servers[mrand.Intn(len(servers))].Server + } + + // Leadership changes. + go func() { + defer wg.Done() + nc, js := jsClient(t, randServer()) + defer nc.Close() + + sds := fmt.Sprintf(server.JSApiStreamLeaderStepDownT, "TEST") + for time.Now().Before(expires) { + time.Sleep(500 * time.Millisecond) + si, err := js.StreamInfo("TEST") + expectOk(t, err) + _, err = nc.Request(sds, nil, time.Second) + expectOk(t, err) + + // Wait on new leader. + checkFor(t, 5*time.Second, 50*time.Millisecond, func() error { + si, err = js.StreamInfo("TEST") + expectOk(t, err) + if si.Cluster.Leader == "" { + return fmt.Errorf("No leader yet") + } + return nil + }) + } + }() + + // Published every 10ms + toc := 0 + go func() { + defer wg.Done() + nc, js := jsClient(t, randServer()) + defer nc.Close() + + for time.Now().Before(expires) { + time.Sleep(10 * time.Millisecond) + _, err := js.Publish("foo", []byte("OK")) + if err == nats.ErrTimeout { + toc++ + continue + } + expectOk(t, err) + } + }() + + // StreamInfo calls. + go func() { + defer wg.Done() + nc, js := jsClient(t, randServer()) + defer nc.Close() + + for time.Now().Before(expires) { + time.Sleep(15 * time.Millisecond) + _, err := js.StreamInfo("TEST") + expectOk(t, err) + } + }() + + wg.Wait() + + // An occasional timeout can occur, but should be 0 or maybe 1 with ~10 leadership changes per test run. + if toc > 1 { + t.Fatalf("Got too many timeout errors from publish: %d", toc) + } + }) +}