Skip to content

Commit

Permalink
[CHANGED] JS: Do not set MaxAckPending to high value on Subscribe
Browse files Browse the repository at this point in the history
When calling js.Subscribe() (or equivalent) and the library ended-up
creating the JS consumer, a MaxAckPending was set to a very high
value in some cases. We now let the server pick the default if the
value is not explicitly set by the user.
The NATS subscription pending limits are set in a way to ensure
that the subscription can store at least the max ack pending value.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Mar 1, 2022
1 parent 268c37b commit 00ccbd1
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions js.go
Expand Up @@ -1284,6 +1284,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
nms string
hbi time.Duration
ccreq *createConsumerRequest // In case we need to hold onto it for ordered consumers.
maxap int
)

// Do some quick checks here for ordered consumers. We do these here instead of spread out
Expand Down Expand Up @@ -1365,6 +1366,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
icfg := &info.Config
hasFC, hbi = icfg.FlowControl, icfg.Heartbeat
hasHeartbeats = hbi > 0
maxap = icfg.MaxAckPending
case (err != nil && !notFoundErr) || (notFoundErr && consumerBound):
// If the consumer is being bound and we got an error on pull subscribe then allow the error.
if !(isPullMode && lookupErr && consumerBound) {
Expand Down Expand Up @@ -1402,16 +1404,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}

// If we have acks at all and the MaxAckPending is not set go ahead
// and set to the internal max.
// TODO(dlc) - We should be able to update this if client updates PendingLimits.
if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy {
if !isPullMode && cb != nil && hasFC {
cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16
} else if ch != nil {
cfg.MaxAckPending = cap(ch)
} else {
cfg.MaxAckPending = DefaultSubPendingMsgsLimit
}
// and set to the internal max for channel based consumers
if cfg.MaxAckPending == 0 && ch != nil && cfg.AckPolicy != AckNonePolicy {
cfg.MaxAckPending = cap(ch)
}
// Create request here.
ccreq = &createConsumerRequest{
Expand Down Expand Up @@ -1458,12 +1453,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
return nil, err
}

// With flow control enabled async subscriptions we will disable msgs
// limits, and set a larger pending bytes limit by default.
if !isPullMode && cb != nil && hasFC {
sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit)
}

// If we fail and we had the sub we need to cleanup, but can't just do a straight Unsubscribe or Drain.
// We need to clear the jsi so we do not remove any durables etc.
cleanUpSub := func() {
Expand Down Expand Up @@ -1574,6 +1563,19 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
sub.mu.Unlock()
}
// Capture max ack pending from the info response here which covers both
// success and failure followed by consumer lookup.
maxap = info.Config.MaxAckPending
}

// If maxap is greater than the default sub's pending limit, use that.
if maxap > DefaultSubPendingMsgsLimit {
// For bytes limit, use the min of maxp*1MB or DefaultSubPendingBytesLimit
bl := maxap * 1024 * 1024
if bl < DefaultSubPendingBytesLimit {
bl = DefaultSubPendingBytesLimit
}
sub.SetPendingLimits(maxap, bl)
}

// Do heartbeats last if needed.
Expand Down

0 comments on commit 00ccbd1

Please sign in to comment.