diff --git a/js.go b/js.go index 190ed3233..933ddc539 100644 --- a/js.go +++ b/js.go @@ -187,6 +187,9 @@ const ( // Default number of retries DefaultPubRetryAttempts = 2 + + // defaultAsyncPubAckInflight is the number of async pub acks inflight. + defaultAsyncPubAckInflight = 4 * 1024 ) // Types of control messages, so far heartbeat and flow control @@ -218,8 +221,8 @@ type jsOpts struct { wait time.Duration // For async publish error handling. aecb MsgErrHandler - // Maximum in flight. - maxap int + // Max async pub ack in flight + maxpa int // the domain that produced the pre domain string // enables protocol tracing @@ -238,8 +241,9 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { js := &js{ nc: nc, opts: &jsOpts{ - pre: defaultAPIPrefix, - wait: defaultRequestWait, + pre: defaultAPIPrefix, + wait: defaultRequestWait, + maxpa: defaultAsyncPubAckInflight, }, } @@ -341,6 +345,8 @@ type pubOpts struct { rwait time.Duration // Retry wait between attempts rnum int // Retry attempts + // stallWait is the max wait of a async pub ack. + stallWait time.Duration } // pubAckResponse is the ack response from the JetStream API when publishing a message. @@ -396,6 +402,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { if o.ttl == 0 && o.ctx == nil { o.ttl = js.opts.wait } + if o.stallWait > 0 { + return nil, fmt.Errorf("nats: stall wait cannot be set to sync publish") + } if o.id != _EMPTY_ { m.Header.Set(MsgIdHdr, o.id) @@ -571,9 +580,9 @@ func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) { paf.js = js js.pafs[id] = paf np := len(js.pafs) - maxap := js.opts.maxap + maxpa := js.opts.maxpa js.mu.Unlock() - return np, maxap + return np, maxpa } // Lock should be held. @@ -625,7 +634,7 @@ func (js *js) handleAsyncReply(m *Msg) { delete(js.pafs, id) // Check on anyone stalled and waiting. - if js.stc != nil && len(js.pafs) < js.opts.maxap { + if js.stc != nil && len(js.pafs) < js.opts.maxpa { close(js.stc) js.stc = nil } @@ -696,7 +705,7 @@ func PublishAsyncMaxPending(max int) JSOpt { if max < 1 { return errors.New("nats: max ack pending should be >= 1") } - js.maxap = max + js.maxpa = max return nil }) } @@ -706,6 +715,8 @@ func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFutu return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...) } +const defaultStallWait = 200 * time.Millisecond + func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { var o pubOpts if len(opts) > 0 { @@ -723,6 +734,10 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if o.ttl != 0 || o.ctx != nil { return nil, ErrContextAndTimeout } + stallWait := defaultStallWait + if o.stallWait > 0 { + stallWait = o.stallWait + } // FIXME(dlc) - Make common. if o.id != _EMPTY_ { @@ -760,7 +775,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if maxPending > 0 && numPending >= maxPending { select { case <-js.asyncStall(): - case <-time.After(200 * time.Millisecond): + case <-time.After(stallWait): js.clearPAF(id) return nil, errors.New("nats: stalled with too many outstanding async published messages") } @@ -844,6 +859,17 @@ func RetryAttempts(num int) PubOpt { }) } +// StallWait sets the max wait when the producer becomes stall producing messages. +func StallWait(ttl time.Duration) PubOpt { + return pubOptFn(func(opts *pubOpts) error { + if ttl <= 0 { + return fmt.Errorf("nats: stall wait should be more than 0") + } + opts.stallWait = ttl + return nil + }) +} + type ackOpts struct { ttl time.Duration ctx context.Context diff --git a/test/js_test.go b/test/js_test.go index 7bb874e99..55cb57883 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -5337,6 +5337,19 @@ func TestJetStreamPublishAsync(t *testing.T) { case <-time.After(5 * time.Second): t.Fatalf("Did not receive completion signal") } + + // Check invalid options + _, err = js.PublishAsync("foo", []byte("Bad"), nats.StallWait(0)) + expectedErr := "nats: stall wait should be more than 0" + if err == nil || err.Error() != expectedErr { + t.Errorf("Expected %v, got: %v", expectedErr, err) + } + + _, err = js.Publish("foo", []byte("Also bad"), nats.StallWait(200*time.Millisecond)) + expectedErr = "nats: stall wait cannot be set to sync publish" + if err == nil || err.Error() != expectedErr { + t.Errorf("Expected %v, got: %v", expectedErr, err) + } } func TestJetStreamPublishAsyncPerf(t *testing.T) {