Skip to content

Commit

Permalink
Allow JetStream Publish retries iff ErrNoResponders was returned.
Browse files Browse the repository at this point in the history
This is to avoid small blips from leader changes surfacing to the end application.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Mar 16, 2022
1 parent b54113a commit 26b58d2
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 4 deletions.
47 changes: 43 additions & 4 deletions js.go
Expand Up @@ -181,6 +181,12 @@ const (
// routine. Without this, the subscription would possibly stall until
// a new message or heartbeat/fc are received.
chanSubFCCheckInterval = 250 * time.Millisecond

// Default time wait between retries on Publish iff err is NoResponders.
DefaultPubRetryWait = 250 * time.Millisecond

// Default number of retries
DefaultPubRetryAttempts = 2
)

// Types of control messages, so far heartbeat and flow control
Expand Down Expand Up @@ -340,6 +346,11 @@ type pubOpts struct {
str string // Expected stream name
seq uint64 // Expected last sequence
lss uint64 // Expected last sequence per subject

// Publish retries for NoResponders err.
rwait time.Duration // Retry wait between attempts
rnum int // Retry attempts

}

// pubAckResponse is the ack response from the JetStream API when publishing a message.
Expand Down Expand Up @@ -377,7 +388,7 @@ const (

// PublishMsg publishes a Msg to a stream from JetStream.
func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
var o pubOpts
var o = pubOpts{rwait: DefaultPubRetryWait, rnum: DefaultPubRetryAttempts}
if len(opts) > 0 {
if m.Header == nil {
m.Header = Header{}
Expand Down Expand Up @@ -422,11 +433,23 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
}

if err != nil {
if err == ErrNoResponders {
err = ErrNoStreamResponse
for r := 0; err == ErrNoResponders && r < o.rnum; r++ {
// To protect against small blips in leadership changes etc, if we get a no responders here retry.
time.Sleep(o.rwait)
if o.ttl > 0 {
resp, err = js.nc.RequestMsg(m, time.Duration(o.ttl))
} else {
resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
}
}
if err != nil {
if err == ErrNoResponders {
err = ErrNoStreamResponse
}
return nil, err
}
return nil, err
}

var pa pubAckResponse
if err := json.Unmarshal(resp.Data, &pa); err != nil {
return nil, ErrInvalidJSAck
Expand Down Expand Up @@ -803,6 +826,22 @@ func ExpectLastMsgId(id string) PubOpt {
})
}

// RetryWait sets the retry wait time when ErrNoResponders is encountered.
func RetryWait(dur time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.rwait = dur
return nil
})
}

// RetryAttempts sets the retry number of attemopts when ErrNoResponders is encountered.
func RetryAttempts(num int) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.rnum = num
return nil
})
}

type ackOpts struct {
ttl time.Duration
ctx context.Context
Expand Down
78 changes: 78 additions & 0 deletions test/js_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"io/ioutil"
mrand "math/rand"
"net"
"os"
"reflect"
Expand Down Expand Up @@ -6266,3 +6267,80 @@ func TestJetStreamSubscribeContextCancel(t *testing.T) {
})
})
}

func TestJetStreamClusterStreamLeaderChangeClientErr(t *testing.T) {
cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
}

withJSClusterAndStream(t, "R3S", 3, cfg, func(t *testing.T, stream string, servers ...*jsServer) {
// We want to make sure the worse thing seen by the lower levels during a leadership change is NoResponders.
// We will have three concurrent contexts going on.
// 1. Leadership Changes every second.
// 2. Publishing messages to the stream every 10ms.
// 3. StreamInfo calls every 15ms.
expires := time.Now().Add(5 * time.Second)
var wg sync.WaitGroup
wg.Add(3)

randServer := func() *server.Server {
return servers[mrand.Intn(len(servers))].Server
}

// Leadership changes.
go func() {
defer wg.Done()
nc, js := jsClient(t, randServer())
defer nc.Close()

sds := fmt.Sprintf(server.JSApiStreamLeaderStepDownT, "TEST")
for time.Now().Before(expires) {
time.Sleep(1 * time.Second)
si, err := js.StreamInfo("TEST")
expectOk(t, err)
_, err = nc.Request(sds, nil, time.Second)
expectOk(t, err)

// Wait on new leader.
checkFor(t, time.Second, 10*time.Millisecond, func() error {
si, err = js.StreamInfo("TEST")
expectOk(t, err)
if si.Cluster.Leader == "" {
return fmt.Errorf("No leader yet")
}
return nil
})
}
}()

// Published every 10ms
go func() {
defer wg.Done()
nc, js := jsClient(t, randServer())
defer nc.Close()

for time.Now().Before(expires) {
time.Sleep(10 * time.Millisecond)
_, err := js.Publish("foo", []byte("OK"))
expectOk(t, err)
}
}()

// StreamInfo calls.
go func() {
defer wg.Done()
nc, js := jsClient(t, randServer())
defer nc.Close()

for time.Now().Before(expires) {
time.Sleep(15 * time.Millisecond)
_, err := js.StreamInfo("TEST")
expectOk(t, err)
}
}()

wg.Wait()
})
}

0 comments on commit 26b58d2

Please sign in to comment.