From a68a1faac57012209816e8fe9e537cefaa6d8c4e Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 1 Mar 2022 12:42:25 -0700 Subject: [PATCH] [CHANGED] JS: Do not set MaxAckPending to high value on Subscribe When calling js.Subscribe() (or equivalent) and the library ended-up creating the JS consumer, a MaxAckPending was set to a very high value in some cases. We now let the server pick the default if the value is not explicitly set by the user. The NATS subscription pending limits are set in a way to ensure that the subscription can store at least the max ack pending value. Signed-off-by: Ivan Kozlovic --- js.go | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/js.go b/js.go index 4003a1849..634d3470a 100644 --- a/js.go +++ b/js.go @@ -1284,6 +1284,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, nms string hbi time.Duration ccreq *createConsumerRequest // In case we need to hold onto it for ordered consumers. + maxap int ) // Do some quick checks here for ordered consumers. We do these here instead of spread out @@ -1365,6 +1366,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, icfg := &info.Config hasFC, hbi = icfg.FlowControl, icfg.Heartbeat hasHeartbeats = hbi > 0 + maxap = icfg.MaxAckPending case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): // If the consumer is being bound and we got an error on pull subscribe then allow the error. if !(isPullMode && lookupErr && consumerBound) { @@ -1402,16 +1404,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } // If we have acks at all and the MaxAckPending is not set go ahead - // and set to the internal max. - // TODO(dlc) - We should be able to update this if client updates PendingLimits. - if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy { - if !isPullMode && cb != nil && hasFC { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16 - } else if ch != nil { - cfg.MaxAckPending = cap(ch) - } else { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit - } + // and set to the internal max for channel based consumers + if cfg.MaxAckPending == 0 && ch != nil && cfg.AckPolicy != AckNonePolicy { + cfg.MaxAckPending = cap(ch) } // Create request here. ccreq = &createConsumerRequest{ @@ -1458,12 +1453,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, return nil, err } - // With flow control enabled async subscriptions we will disable msgs - // limits, and set a larger pending bytes limit by default. - if !isPullMode && cb != nil && hasFC { - sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit) - } - // If we fail and we had the sub we need to cleanup, but can't just do a straight Unsubscribe or Drain. // We need to clear the jsi so we do not remove any durables etc. cleanUpSub := func() { @@ -1574,6 +1563,19 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } sub.mu.Unlock() } + // Capture max ack pending from the info response here which covers both + // success and failure followed by consumer lookup. + maxap = info.Config.MaxAckPending + } + + // If maxap is greater than the default sub's pending limit, use that. + if maxap > DefaultSubPendingMsgsLimit { + // For bytes limit, use the min of maxp*1MB or DefaultSubPendingBytesLimit + bl := maxap * 1024 * 1024 + if bl < DefaultSubPendingBytesLimit { + bl = DefaultSubPendingBytesLimit + } + sub.SetPendingLimits(maxap, bl) } // Do heartbeats last if needed.