Skip to content

Commit

Permalink
Fix leaking timers in stream sources (#4532)
Browse files Browse the repository at this point in the history
Repeated calls to `scheduleSetSourceConsumerRetry` could end up creating
multiple timers for the same source, which would eventually schedule
even more timers, which would result in runaway CPU usage. This PR
instead bounds to one timer per source per stream.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Sep 14, 2023
2 parents f259207 + 6f3f544 commit 46361e8
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions server/stream.go
Expand Up @@ -248,7 +248,8 @@ type stream struct {
mirror *sourceInfo

// Sources
sources map[string]*sourceInfo
sources map[string]*sourceInfo
sourceRetries map[string]*time.Timer

// Indicates we have direct consumers.
directs int
Expand Down Expand Up @@ -2765,12 +2766,24 @@ func (mset *stream) scheduleSetSourceConsumerRetryAsap(si *sourceInfo, seq uint6

// Simply schedules setSourceConsumer at the given delay.
//
// Does not require lock
// Lock held on entry
func (mset *stream) scheduleSetSourceConsumerRetry(iname string, seq uint64, delay time.Duration, startTime time.Time) {
time.AfterFunc(delay, func() {
if mset.sourceRetries == nil {
mset.sourceRetries = map[string]*time.Timer{}
}
if t, ok := mset.sourceRetries[iname]; ok && !t.Stop() {
// It looks like the goroutine has started running but hasn't taken the
// stream lock yet (otherwise the map entry would be deleted). We had
// might as well let the running goroutine complete and schedule another
// timer only if it needs to.
return
}
mset.sourceRetries[iname] = time.AfterFunc(delay, func() {
mset.mu.Lock()
defer mset.mu.Unlock()

delete(mset.sourceRetries, iname)
mset.setSourceConsumer(iname, seq, startTime)
mset.mu.Unlock()
})
}

Expand Down

0 comments on commit 46361e8

Please sign in to comment.