Skip to content

Commit

Permalink
Add StallWait option for async js.publish
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed Mar 29, 2022
1 parent 06b8e8a commit e7874d6
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
24 changes: 23 additions & 1 deletion js.go
Expand Up @@ -345,6 +345,8 @@ type pubOpts struct {
rwait time.Duration // Retry wait between attempts
rnum int // Retry attempts

// stallWait is the max wait of a async pub ack.
stallWait time.Duration
}

// pubAckResponse is the ack response from the JetStream API when publishing a message.
Expand Down Expand Up @@ -400,6 +402,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
if o.ttl == 0 && o.ctx == nil {
o.ttl = js.opts.wait
}
if o.stallWait > 0 {
return nil, fmt.Errorf("nats: stall wait cannot be set to sync publish")
}

if o.id != _EMPTY_ {
m.Header.Set(MsgIdHdr, o.id)
Expand Down Expand Up @@ -710,6 +715,8 @@ func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFutu
return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
}

const defaultStallWait = 200 * time.Millisecond

func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
var o pubOpts
if len(opts) > 0 {
Expand All @@ -727,6 +734,10 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if o.ttl != 0 || o.ctx != nil {
return nil, ErrContextAndTimeout
}
stallWait := defaultStallWait
if o.stallWait > 0 {
stallWait = o.stallWait
}

// FIXME(dlc) - Make common.
if o.id != _EMPTY_ {
Expand Down Expand Up @@ -764,7 +775,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if maxPending > 0 && numPending >= maxPending {
select {
case <-js.asyncStall():
case <-time.After(200 * time.Millisecond):
case <-time.After(stallWait):
js.clearPAF(id)
return nil, errors.New("nats: stalled with too many outstanding async published messages")
}
Expand Down Expand Up @@ -848,6 +859,17 @@ func RetryAttempts(num int) PubOpt {
})
}

// StallWait sets the max wait when the producer becomes stall producing messages.
func StallWait(ttl time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
if ttl <= 0 {
return fmt.Errorf("nats: stall wait should be more than 0")
}
opts.stallWait = ttl
return nil
})
}

type ackOpts struct {
ttl time.Duration
ctx context.Context
Expand Down
13 changes: 13 additions & 0 deletions test/js_test.go
Expand Up @@ -5337,6 +5337,19 @@ func TestJetStreamPublishAsync(t *testing.T) {
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Check invalid options
_, err = js.PublishAsync("foo", []byte("Bad"), nats.StallWait(0))
expectedErr := "nats: stall wait should be more than 0"
if err == nil || err.Error() != expectedErr {
t.Errorf("Expected %v, got: %v", expectedErr, err)
}

_, err = js.Publish("foo", []byte("Also bad"), nats.StallWait(200*time.Millisecond))
expectedErr = "nats: stall wait cannot be set to sync publish"
if err == nil || err.Error() != expectedErr {
t.Errorf("Expected %v, got: %v", expectedErr, err)
}
}

func TestJetStreamPublishAsyncPerf(t *testing.T) {
Expand Down

0 comments on commit e7874d6

Please sign in to comment.