Skip to content

Commit

Permalink
Merge pull request #920 from nats-io/js_max_ack_pending
Browse files Browse the repository at this point in the history
[CHANGED] JS: Do not set MaxAckPending to high value on Subscribe
  • Loading branch information
kozlovic committed Mar 1, 2022
2 parents 268c37b + a68a1fa commit 00e1254
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions js.go
Expand Up @@ -1284,6 +1284,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
nms string
hbi time.Duration
ccreq *createConsumerRequest // In case we need to hold onto it for ordered consumers.
maxap int
)

// Do some quick checks here for ordered consumers. We do these here instead of spread out
Expand Down Expand Up @@ -1365,6 +1366,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
icfg := &info.Config
hasFC, hbi = icfg.FlowControl, icfg.Heartbeat
hasHeartbeats = hbi > 0
maxap = icfg.MaxAckPending
case (err != nil && !notFoundErr) || (notFoundErr && consumerBound):
// If the consumer is being bound and we got an error on pull subscribe then allow the error.
if !(isPullMode && lookupErr && consumerBound) {
Expand Down Expand Up @@ -1402,16 +1404,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}

// If we have acks at all and the MaxAckPending is not set go ahead
// and set to the internal max.
// TODO(dlc) - We should be able to update this if client updates PendingLimits.
if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy {
if !isPullMode && cb != nil && hasFC {
cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16
} else if ch != nil {
cfg.MaxAckPending = cap(ch)
} else {
cfg.MaxAckPending = DefaultSubPendingMsgsLimit
}
// and set to the internal max for channel based consumers
if cfg.MaxAckPending == 0 && ch != nil && cfg.AckPolicy != AckNonePolicy {
cfg.MaxAckPending = cap(ch)
}
// Create request here.
ccreq = &createConsumerRequest{
Expand Down Expand Up @@ -1458,12 +1453,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
return nil, err
}

// With flow control enabled async subscriptions we will disable msgs
// limits, and set a larger pending bytes limit by default.
if !isPullMode && cb != nil && hasFC {
sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit)
}

// If we fail and we had the sub we need to cleanup, but can't just do a straight Unsubscribe or Drain.
// We need to clear the jsi so we do not remove any durables etc.
cleanUpSub := func() {
Expand Down Expand Up @@ -1574,6 +1563,19 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
sub.mu.Unlock()
}
// Capture max ack pending from the info response here which covers both
// success and failure followed by consumer lookup.
maxap = info.Config.MaxAckPending
}

// If maxap is greater than the default sub's pending limit, use that.
if maxap > DefaultSubPendingMsgsLimit {
// For bytes limit, use the min of maxp*1MB or DefaultSubPendingBytesLimit
bl := maxap * 1024 * 1024
if bl < DefaultSubPendingBytesLimit {
bl = DefaultSubPendingBytesLimit
}
sub.SetPendingLimits(maxap, bl)
}

// Do heartbeats last if needed.
Expand Down

0 comments on commit 00e1254

Please sign in to comment.