Skip to content

Commit

Permalink
Only check ack floor if we are interest policy based. (#4206)
Browse files Browse the repository at this point in the history
Saw performance issue with a user a limits based stream with large
number of consumers.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 2, 2023
2 parents 4f2c9a5 + 27bbfb7 commit 25ad3cd
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions server/consumer.go
Expand Up @@ -3315,12 +3315,25 @@ func (o *consumer) checkAckFloor() {
func (o *consumer) processInboundAcks(qch chan struct{}) {
// Grab the server lock to watch for server quit.
o.mu.RLock()
s := o.srv
s, mset := o.srv, o.mset
hasInactiveThresh := o.cfg.InactiveThreshold > 0
o.mu.RUnlock()

if s == nil || mset == nil {
return
}

// Track if we are interest retention policy, if not we can skip the ack floor check.
isInterestRetention := mset.isInterestRetention()

checkAckFloor := func() {
if isInterestRetention {
o.checkAckFloor()
}
}

// We will check this on entry and periodically.
o.checkAckFloor()
checkAckFloor()

// How often we will check for ack floor drift.
var ackFloorCheck = 30 * time.Second
Expand All @@ -3339,7 +3352,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
o.suppressDeletion()
}
case <-time.After(ackFloorCheck):
o.checkAckFloor()
checkAckFloor()
case <-qch:
return
case <-s.quitCh:
Expand Down

0 comments on commit 25ad3cd

Please sign in to comment.