From e7874d6a12971d05eba34106da535d8fa70ae78f Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 29 Mar 2022 14:45:58 -0700 Subject: [PATCH] Add StallWait option for async js.publish Signed-off-by: Waldemar Quevedo --- js.go | 24 +++++++++++++++++++++++- test/js_test.go | 13 +++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/js.go b/js.go index 678059bcc..933ddc539 100644 --- a/js.go +++ b/js.go @@ -345,6 +345,8 @@ type pubOpts struct { rwait time.Duration // Retry wait between attempts rnum int // Retry attempts + // stallWait is the max wait of a async pub ack. + stallWait time.Duration } // pubAckResponse is the ack response from the JetStream API when publishing a message. @@ -400,6 +402,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { if o.ttl == 0 && o.ctx == nil { o.ttl = js.opts.wait } + if o.stallWait > 0 { + return nil, fmt.Errorf("nats: stall wait cannot be set to sync publish") + } if o.id != _EMPTY_ { m.Header.Set(MsgIdHdr, o.id) @@ -710,6 +715,8 @@ func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFutu return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...) } +const defaultStallWait = 200 * time.Millisecond + func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { var o pubOpts if len(opts) > 0 { @@ -727,6 +734,10 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if o.ttl != 0 || o.ctx != nil { return nil, ErrContextAndTimeout } + stallWait := defaultStallWait + if o.stallWait > 0 { + stallWait = o.stallWait + } // FIXME(dlc) - Make common. if o.id != _EMPTY_ { @@ -764,7 +775,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if maxPending > 0 && numPending >= maxPending { select { case <-js.asyncStall(): - case <-time.After(200 * time.Millisecond): + case <-time.After(stallWait): js.clearPAF(id) return nil, errors.New("nats: stalled with too many outstanding async published messages") } @@ -848,6 +859,17 @@ func RetryAttempts(num int) PubOpt { }) } +// StallWait sets the max wait when the producer becomes stall producing messages. +func StallWait(ttl time.Duration) PubOpt { + return pubOptFn(func(opts *pubOpts) error { + if ttl <= 0 { + return fmt.Errorf("nats: stall wait should be more than 0") + } + opts.stallWait = ttl + return nil + }) +} + type ackOpts struct { ttl time.Duration ctx context.Context diff --git a/test/js_test.go b/test/js_test.go index 7bb874e99..55cb57883 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -5337,6 +5337,19 @@ func TestJetStreamPublishAsync(t *testing.T) { case <-time.After(5 * time.Second): t.Fatalf("Did not receive completion signal") } + + // Check invalid options + _, err = js.PublishAsync("foo", []byte("Bad"), nats.StallWait(0)) + expectedErr := "nats: stall wait should be more than 0" + if err == nil || err.Error() != expectedErr { + t.Errorf("Expected %v, got: %v", expectedErr, err) + } + + _, err = js.Publish("foo", []byte("Also bad"), nats.StallWait(200*time.Millisecond)) + expectedErr = "nats: stall wait cannot be set to sync publish" + if err == nil || err.Error() != expectedErr { + t.Errorf("Expected %v, got: %v", expectedErr, err) + } } func TestJetStreamPublishAsyncPerf(t *testing.T) {