Skip to content

Commit

Permalink
Merge pull request #930 from nats-io/js_pub_retry_2
Browse files Browse the repository at this point in the history
Allow JetStream Publish retries iff ErrNoResponders was returned.
  • Loading branch information
derekcollison committed Mar 17, 2022
2 parents c92df80 + 2a5ee5f commit 4fef66c
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 4 deletions.
59 changes: 55 additions & 4 deletions js.go
Expand Up @@ -181,6 +181,12 @@ const (
// routine. Without this, the subscription would possibly stall until
// a new message or heartbeat/fc are received.
chanSubFCCheckInterval = 250 * time.Millisecond

// Default time wait between retries on Publish iff err is NoResponders.
DefaultPubRetryWait = 250 * time.Millisecond

// Default number of retries
DefaultPubRetryAttempts = 2
)

// Types of control messages, so far heartbeat and flow control
Expand Down Expand Up @@ -340,6 +346,11 @@ type pubOpts struct {
str string // Expected stream name
seq uint64 // Expected last sequence
lss uint64 // Expected last sequence per subject

// Publish retries for NoResponders err.
rwait time.Duration // Retry wait between attempts
rnum int // Retry attempts

}

// pubAckResponse is the ack response from the JetStream API when publishing a message.
Expand Down Expand Up @@ -377,7 +388,7 @@ const (

// PublishMsg publishes a Msg to a stream from JetStream.
func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
var o pubOpts
var o = pubOpts{rwait: DefaultPubRetryWait, rnum: DefaultPubRetryAttempts}
if len(opts) > 0 {
if m.Header == nil {
m.Header = Header{}
Expand Down Expand Up @@ -422,11 +433,35 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
}

if err != nil {
if err == ErrNoResponders {
err = ErrNoStreamResponse
for r, ttl := 0, o.ttl; err == ErrNoResponders && (r < o.rnum || o.rnum < 0); r++ {
// To protect against small blips in leadership changes etc, if we get a no responders here retry.
if o.ctx != nil {
select {
case <-o.ctx.Done():
case <-time.After(o.rwait):
}
} else {
time.Sleep(o.rwait)
}
if o.ttl > 0 {
ttl -= o.rwait
if ttl <= 0 {
err = ErrTimeout
break
}
resp, err = js.nc.RequestMsg(m, time.Duration(ttl))
} else {
resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
}
}
if err != nil {
if err == ErrNoResponders {
err = ErrNoStreamResponse
}
return nil, err
}
return nil, err
}

var pa pubAckResponse
if err := json.Unmarshal(resp.Data, &pa); err != nil {
return nil, ErrInvalidJSAck
Expand Down Expand Up @@ -803,6 +838,22 @@ func ExpectLastMsgId(id string) PubOpt {
})
}

// RetryWait sets the retry wait time when ErrNoResponders is encountered.
func RetryWait(dur time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.rwait = dur
return nil
})
}

// RetryAttempts sets the retry number of attemopts when ErrNoResponders is encountered.
func RetryAttempts(num int) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.rnum = num
return nil
})
}

type ackOpts struct {
ttl time.Duration
ctx context.Context
Expand Down
90 changes: 90 additions & 0 deletions test/js_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"io/ioutil"
mrand "math/rand"
"net"
"os"
"reflect"
Expand Down Expand Up @@ -6266,3 +6267,92 @@ func TestJetStreamSubscribeContextCancel(t *testing.T) {
})
})
}

func TestJetStreamClusterStreamLeaderChangeClientErr(t *testing.T) {
cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
}

withJSClusterAndStream(t, "R3S", 3, cfg, func(t *testing.T, stream string, servers ...*jsServer) {
// We want to make sure the worse thing seen by the lower levels during a leadership change is NoResponders.
// We will have three concurrent contexts going on.
// 1. Leadership Changes every 500ms.
// 2. Publishing messages to the stream every 10ms.
// 3. StreamInfo calls every 15ms.
expires := time.Now().Add(5 * time.Second)
var wg sync.WaitGroup
wg.Add(3)

randServer := func() *server.Server {
return servers[mrand.Intn(len(servers))].Server
}

// Leadership changes.
go func() {
defer wg.Done()
nc, js := jsClient(t, randServer())
defer nc.Close()

sds := fmt.Sprintf(server.JSApiStreamLeaderStepDownT, "TEST")
for time.Now().Before(expires) {
time.Sleep(500 * time.Millisecond)
si, err := js.StreamInfo("TEST")
expectOk(t, err)
_, err = nc.Request(sds, nil, time.Second)
expectOk(t, err)

// Wait on new leader.
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
si, err = js.StreamInfo("TEST")
if err != nil {
return err
}
if si.Cluster.Leader == "" {
return fmt.Errorf("No leader yet")
}
return nil
})
}
}()

// Published every 10ms
toc := 0
go func() {
defer wg.Done()
nc, js := jsClient(t, randServer())
defer nc.Close()

for time.Now().Before(expires) {
time.Sleep(10 * time.Millisecond)
_, err := js.Publish("foo", []byte("OK"))
if err == nats.ErrTimeout {
toc++
continue
}
expectOk(t, err)
}
}()

// StreamInfo calls.
go func() {
defer wg.Done()
nc, js := jsClient(t, randServer())
defer nc.Close()

for time.Now().Before(expires) {
time.Sleep(15 * time.Millisecond)
_, err := js.StreamInfo("TEST")
expectOk(t, err)
}
}()

wg.Wait()

// An occasional timeout can occur, but should be 0 or maybe 1 with ~10 leadership changes per test run.
if toc > 1 {
t.Fatalf("Got too many timeout errors from publish: %d", toc)
}
})
}

0 comments on commit 4fef66c

Please sign in to comment.