Skip to content

Commit

Permalink
Set async pub ack inflight pending default to 4K, add StallWait option
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed Mar 29, 2022
1 parent ce33b19 commit dd3a012
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 9 deletions.
44 changes: 35 additions & 9 deletions js.go
Expand Up @@ -187,6 +187,9 @@ const (

// Default number of retries
DefaultPubRetryAttempts = 2

// defaultAsyncPubAckInflight is the number of async pub acks inflight.
defaultAsyncPubAckInflight = 4000
)

// Types of control messages, so far heartbeat and flow control
Expand Down Expand Up @@ -218,8 +221,8 @@ type jsOpts struct {
wait time.Duration
// For async publish error handling.
aecb MsgErrHandler
// Maximum in flight.
maxap int
// Max async pub ack in flight
maxpa int
// the domain that produced the pre
domain string
// enables protocol tracing
Expand All @@ -238,8 +241,9 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js := &js{
nc: nc,
opts: &jsOpts{
pre: defaultAPIPrefix,
wait: defaultRequestWait,
pre: defaultAPIPrefix,
wait: defaultRequestWait,
maxpa: defaultAsyncPubAckInflight,
},
}

Expand Down Expand Up @@ -341,6 +345,8 @@ type pubOpts struct {
rwait time.Duration // Retry wait between attempts
rnum int // Retry attempts

// stallWait is the max wait of a async pub ack.
stallWait time.Duration
}

// pubAckResponse is the ack response from the JetStream API when publishing a message.
Expand Down Expand Up @@ -396,6 +402,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
if o.ttl == 0 && o.ctx == nil {
o.ttl = js.opts.wait
}
if o.stallWait > 0 {
return nil, fmt.Errorf("nats: stall wait cannot be set to sync publish")
}

if o.id != _EMPTY_ {
m.Header.Set(MsgIdHdr, o.id)
Expand Down Expand Up @@ -571,9 +580,9 @@ func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
paf.js = js
js.pafs[id] = paf
np := len(js.pafs)
maxap := js.opts.maxap
maxpa := js.opts.maxpa
js.mu.Unlock()
return np, maxap
return np, maxpa
}

// Lock should be held.
Expand Down Expand Up @@ -625,7 +634,7 @@ func (js *js) handleAsyncReply(m *Msg) {
delete(js.pafs, id)

// Check on anyone stalled and waiting.
if js.stc != nil && len(js.pafs) < js.opts.maxap {
if js.stc != nil && len(js.pafs) < js.opts.maxpa {
close(js.stc)
js.stc = nil
}
Expand Down Expand Up @@ -696,7 +705,7 @@ func PublishAsyncMaxPending(max int) JSOpt {
if max < 1 {
return errors.New("nats: max ack pending should be >= 1")
}
js.maxap = max
js.maxpa = max
return nil
})
}
Expand All @@ -706,6 +715,8 @@ func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFutu
return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
}

const defaultStallWait = 200 * time.Millisecond

func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
var o pubOpts
if len(opts) > 0 {
Expand All @@ -723,6 +734,10 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if o.ttl != 0 || o.ctx != nil {
return nil, ErrContextAndTimeout
}
stallWait := defaultStallWait
if o.stallWait > 0 {
stallWait = o.stallWait
}

// FIXME(dlc) - Make common.
if o.id != _EMPTY_ {
Expand Down Expand Up @@ -760,7 +775,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if maxPending > 0 && numPending >= maxPending {
select {
case <-js.asyncStall():
case <-time.After(200 * time.Millisecond):
case <-time.After(stallWait):
js.clearPAF(id)
return nil, errors.New("nats: stalled with too many outstanding async published messages")
}
Expand Down Expand Up @@ -844,6 +859,17 @@ func RetryAttempts(num int) PubOpt {
})
}

// StallWait sets the max wait when the producer becomes stall producing messages.
func StallWait(ttl time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
if ttl <= 0 {
return fmt.Errorf("nats: stall wait should be more than 0")
}
opts.stallWait = ttl
return nil
})
}

type ackOpts struct {
ttl time.Duration
ctx context.Context
Expand Down
13 changes: 13 additions & 0 deletions test/js_test.go
Expand Up @@ -5337,6 +5337,19 @@ func TestJetStreamPublishAsync(t *testing.T) {
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Check invalid options
_, err = js.PublishAsync("foo", []byte("Bad"), nats.StallWait(0))
expectedErr := "nats: stall wait should be more than 0"
if err == nil || err.Error() != expectedErr {
t.Errorf("Expected %v, got: %v", expectedErr, err)
}

_, err = js.Publish("foo", []byte("Also bad"), nats.StallWait(200*time.Millisecond))
expectedErr = "nats: stall wait cannot be set to sync publish"
if err == nil || err.Error() != expectedErr {
t.Errorf("Expected %v, got: %v", expectedErr, err)
}
}

func TestJetStreamPublishAsyncPerf(t *testing.T) {
Expand Down

0 comments on commit dd3a012

Please sign in to comment.