Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set async pub ack inflight pending default to 4K, add StallWait option. #941

Merged
merged 1 commit into from Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 35 additions & 9 deletions js.go
Expand Up @@ -187,6 +187,9 @@ const (

// Default number of retries
DefaultPubRetryAttempts = 2

// defaultAsyncPubAckInflight is the number of async pub acks inflight.
defaultAsyncPubAckInflight = 4000
)

// Types of control messages, so far heartbeat and flow control
Expand Down Expand Up @@ -218,8 +221,8 @@ type jsOpts struct {
wait time.Duration
// For async publish error handling.
aecb MsgErrHandler
// Maximum in flight.
maxap int
// Max async pub ack in flight
maxpa int
// the domain that produced the pre
domain string
// enables protocol tracing
Expand All @@ -238,8 +241,9 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js := &js{
nc: nc,
opts: &jsOpts{
pre: defaultAPIPrefix,
wait: defaultRequestWait,
pre: defaultAPIPrefix,
wait: defaultRequestWait,
maxpa: defaultAsyncPubAckInflight,
},
}

Expand Down Expand Up @@ -341,6 +345,8 @@ type pubOpts struct {
rwait time.Duration // Retry wait between attempts
rnum int // Retry attempts

// stallWait is the max wait of a async pub ack.
stallWait time.Duration
}

// pubAckResponse is the ack response from the JetStream API when publishing a message.
Expand Down Expand Up @@ -396,6 +402,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
if o.ttl == 0 && o.ctx == nil {
o.ttl = js.opts.wait
}
if o.stallWait > 0 {
return nil, fmt.Errorf("nats: stall wait cannot be set to sync publish")
}

if o.id != _EMPTY_ {
m.Header.Set(MsgIdHdr, o.id)
Expand Down Expand Up @@ -571,9 +580,9 @@ func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
paf.js = js
js.pafs[id] = paf
np := len(js.pafs)
maxap := js.opts.maxap
maxpa := js.opts.maxpa
js.mu.Unlock()
return np, maxap
return np, maxpa
}

// Lock should be held.
Expand Down Expand Up @@ -625,7 +634,7 @@ func (js *js) handleAsyncReply(m *Msg) {
delete(js.pafs, id)

// Check on anyone stalled and waiting.
if js.stc != nil && len(js.pafs) < js.opts.maxap {
if js.stc != nil && len(js.pafs) < js.opts.maxpa {
close(js.stc)
js.stc = nil
}
Expand Down Expand Up @@ -696,7 +705,7 @@ func PublishAsyncMaxPending(max int) JSOpt {
if max < 1 {
return errors.New("nats: max ack pending should be >= 1")
}
js.maxap = max
js.maxpa = max
return nil
})
}
Expand All @@ -706,6 +715,8 @@ func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFutu
return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
}

const defaultStallWait = 200 * time.Millisecond

func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
var o pubOpts
if len(opts) > 0 {
Expand All @@ -723,6 +734,10 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if o.ttl != 0 || o.ctx != nil {
return nil, ErrContextAndTimeout
}
stallWait := defaultStallWait
if o.stallWait > 0 {
stallWait = o.stallWait
}

// FIXME(dlc) - Make common.
if o.id != _EMPTY_ {
Expand Down Expand Up @@ -760,7 +775,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if maxPending > 0 && numPending >= maxPending {
select {
case <-js.asyncStall():
case <-time.After(200 * time.Millisecond):
case <-time.After(stallWait):
js.clearPAF(id)
return nil, errors.New("nats: stalled with too many outstanding async published messages")
}
Expand Down Expand Up @@ -844,6 +859,17 @@ func RetryAttempts(num int) PubOpt {
})
}

// StallWait sets the max wait when the producer becomes stall producing messages.
func StallWait(ttl time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
if ttl <= 0 {
return fmt.Errorf("nats: stall wait should be more than 0")
}
opts.stallWait = ttl
return nil
})
}

type ackOpts struct {
ttl time.Duration
ctx context.Context
Expand Down
13 changes: 13 additions & 0 deletions test/js_test.go
Expand Up @@ -5337,6 +5337,19 @@ func TestJetStreamPublishAsync(t *testing.T) {
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Check invalid options
_, err = js.PublishAsync("foo", []byte("Bad"), nats.StallWait(0))
expectedErr := "nats: stall wait should be more than 0"
if err == nil || err.Error() != expectedErr {
t.Errorf("Expected %v, got: %v", expectedErr, err)
}

_, err = js.Publish("foo", []byte("Also bad"), nats.StallWait(200*time.Millisecond))
expectedErr = "nats: stall wait cannot be set to sync publish"
if err == nil || err.Error() != expectedErr {
t.Errorf("Expected %v, got: %v", expectedErr, err)
}
}

func TestJetStreamPublishAsyncPerf(t *testing.T) {
Expand Down