Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] JS: Do not set MaxAckPending to high value on Subscribe #920

Merged
merged 1 commit into from Mar 1, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 18 additions & 16 deletions js.go
Expand Up @@ -1284,6 +1284,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
nms string
hbi time.Duration
ccreq *createConsumerRequest // In case we need to hold onto it for ordered consumers.
maxap int
)

// Do some quick checks here for ordered consumers. We do these here instead of spread out
Expand Down Expand Up @@ -1365,6 +1366,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
icfg := &info.Config
hasFC, hbi = icfg.FlowControl, icfg.Heartbeat
hasHeartbeats = hbi > 0
maxap = icfg.MaxAckPending
case (err != nil && !notFoundErr) || (notFoundErr && consumerBound):
// If the consumer is being bound and we got an error on pull subscribe then allow the error.
if !(isPullMode && lookupErr && consumerBound) {
Expand Down Expand Up @@ -1402,16 +1404,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}

// If we have acks at all and the MaxAckPending is not set go ahead
// and set to the internal max.
// TODO(dlc) - We should be able to update this if client updates PendingLimits.
if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy {
if !isPullMode && cb != nil && hasFC {
cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16
} else if ch != nil {
cfg.MaxAckPending = cap(ch)
} else {
cfg.MaxAckPending = DefaultSubPendingMsgsLimit
}
// and set to the internal max for channel based consumers
if cfg.MaxAckPending == 0 && ch != nil && cfg.AckPolicy != AckNonePolicy {
cfg.MaxAckPending = cap(ch)
}
// Create request here.
ccreq = &createConsumerRequest{
Expand Down Expand Up @@ -1458,12 +1453,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
return nil, err
}

// With flow control enabled async subscriptions we will disable msgs
// limits, and set a larger pending bytes limit by default.
if !isPullMode && cb != nil && hasFC {
sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit)
}

// If we fail and we had the sub we need to cleanup, but can't just do a straight Unsubscribe or Drain.
// We need to clear the jsi so we do not remove any durables etc.
cleanUpSub := func() {
Expand Down Expand Up @@ -1574,6 +1563,19 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
sub.mu.Unlock()
}
// Capture max ack pending from the info response here which covers both
// success and failure followed by consumer lookup.
maxap = info.Config.MaxAckPending
}

// If maxap is greater than the default sub's pending limit, use that.
if maxap > DefaultSubPendingMsgsLimit {
// For bytes limit, use the min of maxp*1MB or DefaultSubPendingBytesLimit
bl := maxap * 1024 * 1024
if bl < DefaultSubPendingBytesLimit {
bl = DefaultSubPendingBytesLimit
}
sub.SetPendingLimits(maxap, bl)
}

// Do heartbeats last if needed.
Expand Down