Skip to content

Commit

Permalink
[FIXED] Avoid deadlock with usage lock for an account during checkAnd…
Browse files Browse the repository at this point in the history
…SyncUsage() (#4176)

 Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 17, 2023
2 parents 5db57fb + 44a5875 commit b856bba
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions server/jetstream.go
Expand Up @@ -1813,6 +1813,17 @@ func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType)
}
s := js.srv

// We need to collect the stream stores before we acquire the usage lock since in storeUpdates the
// stream lock could be held if deletion are inline with storing a new message, e.g. via limits.
var stores []StreamStore
for _, mset := range jsa.streams {
mset.mu.RLock()
if mset.tier == tierName && mset.stype == storeType && mset.store != nil {
stores = append(stores, mset.store)
}
mset.mu.RUnlock()
}

// Now range and qualify, hold usage lock to prevent updates.
jsa.usageMu.Lock()
defer jsa.usageMu.Unlock()
Expand All @@ -1822,15 +1833,12 @@ func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType)
return
}

// Collect current total for all stream stores that matched.
var total int64
var state StreamState
for _, mset := range jsa.streams {
mset.mu.RLock()
if mset.tier == tierName && mset.stype == storeType {
mset.store.FastState(&state)
total += int64(state.Bytes)
}
mset.mu.RUnlock()
for _, store := range stores {
store.FastState(&state)
total += int64(state.Bytes)
}

var needClusterUpdate bool
Expand Down

0 comments on commit b856bba

Please sign in to comment.