diff --git a/js.go b/js.go index 9f36d909c..dcc6f1c38 100644 --- a/js.go +++ b/js.go @@ -181,6 +181,12 @@ const ( // routine. Without this, the subscription would possibly stall until // a new message or heartbeat/fc are received. chanSubFCCheckInterval = 250 * time.Millisecond + + // Default time wait between retries on Publish iff err is NoResponders. + DefaultPubRetryWait = 250 * time.Millisecond + + // Default number of retries + DefaultPubRetryAttempts = 2 ) // Types of control messages, so far heartbeat and flow control @@ -340,6 +346,11 @@ type pubOpts struct { str string // Expected stream name seq uint64 // Expected last sequence lss uint64 // Expected last sequence per subject + + // Publish retries for NoResponders err. + rwait time.Duration // Retry wait between attempts + rnum int // Retry attempts + } // pubAckResponse is the ack response from the JetStream API when publishing a message. @@ -377,7 +388,7 @@ const ( // PublishMsg publishes a Msg to a stream from JetStream. func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { - var o pubOpts + var o = pubOpts{rwait: DefaultPubRetryWait, rnum: DefaultPubRetryAttempts} if len(opts) > 0 { if m.Header == nil { m.Header = Header{} @@ -422,11 +433,23 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { } if err != nil { - if err == ErrNoResponders { - err = ErrNoStreamResponse + for r := 0; err == ErrNoResponders && r < o.rnum; r++ { + // To protect against small blips in leadership changes etc, if we get a no responders here retry. + time.Sleep(o.rwait) + if o.ttl > 0 { + resp, err = js.nc.RequestMsg(m, time.Duration(o.ttl)) + } else { + resp, err = js.nc.RequestMsgWithContext(o.ctx, m) + } + } + if err != nil { + if err == ErrNoResponders { + err = ErrNoStreamResponse + } + return nil, err } - return nil, err } + var pa pubAckResponse if err := json.Unmarshal(resp.Data, &pa); err != nil { return nil, ErrInvalidJSAck @@ -803,6 +826,22 @@ func ExpectLastMsgId(id string) PubOpt { }) } +// RetryWait sets the retry wait time when ErrNoResponders is encountered. +func RetryWait(dur time.Duration) PubOpt { + return pubOptFn(func(opts *pubOpts) error { + opts.rwait = dur + return nil + }) +} + +// RetryAttempts sets the retry number of attemopts when ErrNoResponders is encountered. +func RetryAttempts(num int) PubOpt { + return pubOptFn(func(opts *pubOpts) error { + opts.rnum = num + return nil + }) +} + type ackOpts struct { ttl time.Duration ctx context.Context diff --git a/test/js_test.go b/test/js_test.go index edbc23943..d664de6b3 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io/ioutil" + mrand "math/rand" "net" "os" "reflect" @@ -6266,3 +6267,80 @@ func TestJetStreamSubscribeContextCancel(t *testing.T) { }) }) } + +func TestJetStreamClusterStreamLeaderChangeClientErr(t *testing.T) { + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + } + + withJSClusterAndStream(t, "R3S", 3, cfg, func(t *testing.T, stream string, servers ...*jsServer) { + // We want to make sure the worse thing seen by the lower levels during a leadership change is NoResponders. + // We will have three concurrent contexts going on. + // 1. Leadership Changes every second. + // 2. Publishing messages to the stream every 10ms. + // 3. StreamInfo calls every 15ms. + expires := time.Now().Add(5 * time.Second) + var wg sync.WaitGroup + wg.Add(3) + + randServer := func() *server.Server { + return servers[mrand.Intn(len(servers))].Server + } + + // Leadership changes. + go func() { + defer wg.Done() + nc, js := jsClient(t, randServer()) + defer nc.Close() + + sds := fmt.Sprintf(server.JSApiStreamLeaderStepDownT, "TEST") + for time.Now().Before(expires) { + time.Sleep(1 * time.Second) + si, err := js.StreamInfo("TEST") + expectOk(t, err) + _, err = nc.Request(sds, nil, time.Second) + expectOk(t, err) + + // Wait on new leader. + checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { + si, err = js.StreamInfo("TEST") + expectOk(t, err) + if si.Cluster.Leader == "" { + return fmt.Errorf("No leader yet") + } + return nil + }) + } + }() + + // Published every 10ms + go func() { + defer wg.Done() + nc, js := jsClient(t, randServer()) + defer nc.Close() + + for time.Now().Before(expires) { + time.Sleep(10 * time.Millisecond) + _, err := js.Publish("foo", []byte("OK")) + expectOk(t, err) + } + }() + + // StreamInfo calls. + go func() { + defer wg.Done() + nc, js := jsClient(t, randServer()) + defer nc.Close() + + for time.Now().Before(expires) { + time.Sleep(15 * time.Millisecond) + _, err := js.StreamInfo("TEST") + expectOk(t, err) + } + }() + + wg.Wait() + }) +}