Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Avoid deadlock with usage lock for an account during checkAndSyncUsage() #4176

Merged
merged 1 commit into from May 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 15 additions & 7 deletions server/jetstream.go
Expand Up @@ -1813,6 +1813,17 @@ func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType)
}
s := js.srv

// We need to collect the stream stores before we acquire the usage lock since in storeUpdates the
// stream lock could be held if deletion are inline with storing a new message, e.g. via limits.
var stores []StreamStore
for _, mset := range jsa.streams {
mset.mu.RLock()
if mset.tier == tierName && mset.stype == storeType && mset.store != nil {
stores = append(stores, mset.store)
}
mset.mu.RUnlock()
}

// Now range and qualify, hold usage lock to prevent updates.
jsa.usageMu.Lock()
defer jsa.usageMu.Unlock()
Expand All @@ -1822,15 +1833,12 @@ func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType)
return
}

// Collect current total for all stream stores that matched.
var total int64
var state StreamState
for _, mset := range jsa.streams {
mset.mu.RLock()
if mset.tier == tierName && mset.stype == storeType {
mset.store.FastState(&state)
total += int64(state.Bytes)
}
mset.mu.RUnlock()
for _, store := range stores {
store.FastState(&state)
total += int64(state.Bytes)
}

var needClusterUpdate bool
Expand Down