Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow JetStream Publish retries iff ErrNoResponders was returned. #930

Merged
merged 1 commit into from Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 55 additions & 4 deletions js.go
Expand Up @@ -181,6 +181,12 @@ const (
// routine. Without this, the subscription would possibly stall until
// a new message or heartbeat/fc are received.
chanSubFCCheckInterval = 250 * time.Millisecond

// Default time wait between retries on Publish iff err is NoResponders.
DefaultPubRetryWait = 250 * time.Millisecond

// Default number of retries
DefaultPubRetryAttempts = 2
)

// Types of control messages, so far heartbeat and flow control
Expand Down Expand Up @@ -340,6 +346,11 @@ type pubOpts struct {
str string // Expected stream name
seq uint64 // Expected last sequence
lss uint64 // Expected last sequence per subject

// Publish retries for NoResponders err.
rwait time.Duration // Retry wait between attempts
rnum int // Retry attempts

}

// pubAckResponse is the ack response from the JetStream API when publishing a message.
Expand Down Expand Up @@ -377,7 +388,7 @@ const (

// PublishMsg publishes a Msg to a stream from JetStream.
func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
var o pubOpts
var o = pubOpts{rwait: DefaultPubRetryWait, rnum: DefaultPubRetryAttempts}
if len(opts) > 0 {
if m.Header == nil {
m.Header = Header{}
Expand Down Expand Up @@ -422,11 +433,35 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
}

if err != nil {
if err == ErrNoResponders {
err = ErrNoStreamResponse
for r, ttl := 0, o.ttl; err == ErrNoResponders && (r < o.rnum || o.rnum < 0); r++ {
// To protect against small blips in leadership changes etc, if we get a no responders here retry.
if o.ctx != nil {
select {
case <-o.ctx.Done():
case <-time.After(o.rwait):
}
} else {
time.Sleep(o.rwait)
}
if o.ttl > 0 {
ttl -= o.rwait
if ttl <= 0 {
err = ErrTimeout
break
}
resp, err = js.nc.RequestMsg(m, time.Duration(ttl))
} else {
resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
}
}
if err != nil {
if err == ErrNoResponders {
err = ErrNoStreamResponse
}
return nil, err
}
return nil, err
}

var pa pubAckResponse
if err := json.Unmarshal(resp.Data, &pa); err != nil {
return nil, ErrInvalidJSAck
Expand Down Expand Up @@ -803,6 +838,22 @@ func ExpectLastMsgId(id string) PubOpt {
})
}

// RetryWait sets the retry wait time when ErrNoResponders is encountered.
func RetryWait(dur time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.rwait = dur
return nil
})
}

// RetryAttempts sets the retry number of attemopts when ErrNoResponders is encountered.
func RetryAttempts(num int) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.rnum = num
return nil
})
}

type ackOpts struct {
ttl time.Duration
ctx context.Context
Expand Down
90 changes: 90 additions & 0 deletions test/js_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"io/ioutil"
mrand "math/rand"
"net"
"os"
"reflect"
Expand Down Expand Up @@ -6266,3 +6267,92 @@ func TestJetStreamSubscribeContextCancel(t *testing.T) {
})
})
}

func TestJetStreamClusterStreamLeaderChangeClientErr(t *testing.T) {
cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
}

withJSClusterAndStream(t, "R3S", 3, cfg, func(t *testing.T, stream string, servers ...*jsServer) {
// We want to make sure the worse thing seen by the lower levels during a leadership change is NoResponders.
// We will have three concurrent contexts going on.
// 1. Leadership Changes every 500ms.
// 2. Publishing messages to the stream every 10ms.
// 3. StreamInfo calls every 15ms.
expires := time.Now().Add(5 * time.Second)
var wg sync.WaitGroup
wg.Add(3)

randServer := func() *server.Server {
return servers[mrand.Intn(len(servers))].Server
}

// Leadership changes.
go func() {
defer wg.Done()
nc, js := jsClient(t, randServer())
defer nc.Close()

sds := fmt.Sprintf(server.JSApiStreamLeaderStepDownT, "TEST")
for time.Now().Before(expires) {
time.Sleep(500 * time.Millisecond)
si, err := js.StreamInfo("TEST")
expectOk(t, err)
_, err = nc.Request(sds, nil, time.Second)
expectOk(t, err)

// Wait on new leader.
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
si, err = js.StreamInfo("TEST")
if err != nil {
return err
}
if si.Cluster.Leader == "" {
return fmt.Errorf("No leader yet")
}
return nil
})
}
}()

// Published every 10ms
toc := 0
go func() {
defer wg.Done()
nc, js := jsClient(t, randServer())
defer nc.Close()

for time.Now().Before(expires) {
time.Sleep(10 * time.Millisecond)
_, err := js.Publish("foo", []byte("OK"))
if err == nats.ErrTimeout {
toc++
continue
}
expectOk(t, err)
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
}
}()

// StreamInfo calls.
go func() {
defer wg.Done()
nc, js := jsClient(t, randServer())
defer nc.Close()

for time.Now().Before(expires) {
time.Sleep(15 * time.Millisecond)
_, err := js.StreamInfo("TEST")
expectOk(t, err)
}
}()

wg.Wait()

// An occasional timeout can occur, but should be 0 or maybe 1 with ~10 leadership changes per test run.
if toc > 1 {
t.Fatalf("Got too many timeout errors from publish: %d", toc)
}
})
}