Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leaking timers in stream sources #4532

Merged
merged 1 commit into from Sep 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 17 additions & 4 deletions server/stream.go
Expand Up @@ -248,7 +248,8 @@ type stream struct {
mirror *sourceInfo

// Sources
sources map[string]*sourceInfo
sources map[string]*sourceInfo
sourceRetries map[string]*time.Timer

// Indicates we have direct consumers.
directs int
Expand Down Expand Up @@ -2765,12 +2766,24 @@ func (mset *stream) scheduleSetSourceConsumerRetryAsap(si *sourceInfo, seq uint6

// Simply schedules setSourceConsumer at the given delay.
//
// Does not require lock
// Lock held on entry
func (mset *stream) scheduleSetSourceConsumerRetry(iname string, seq uint64, delay time.Duration, startTime time.Time) {
time.AfterFunc(delay, func() {
if mset.sourceRetries == nil {
mset.sourceRetries = map[string]*time.Timer{}
}
if t, ok := mset.sourceRetries[iname]; ok && !t.Stop() {
// It looks like the goroutine has started running but hasn't taken the
// stream lock yet (otherwise the map entry would be deleted). We had
// might as well let the running goroutine complete and schedule another
// timer only if it needs to.
return
}
mset.sourceRetries[iname] = time.AfterFunc(delay, func() {
mset.mu.Lock()
defer mset.mu.Unlock()

delete(mset.sourceRetries, iname)
mset.setSourceConsumer(iname, seq, startTime)
mset.mu.Unlock()
})
}

Expand Down