Skip to content

Commit

Permalink
Use write lock for memory store filtered state (#4498)
Browse files Browse the repository at this point in the history
Calls to `filteredStateLocked` can mutate the per-subject state by
recalculating the first sequences. Doing so under a read-only lock can
lead to race conditions, so switch the call sites out to write locks
instead.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Sep 7, 2023
2 parents 11f0ea9 + e19e97b commit 6332512
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions server/memstore.go
Expand Up @@ -299,8 +299,11 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {

// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence.
func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState {
ms.mu.RLock()
defer ms.mu.RUnlock()
// This needs to be a write lock, as filteredStateLocked can
// mutate the per-subject state.
ms.mu.Lock()
defer ms.mu.Unlock()

return ms.filteredStateLocked(sseq, subj, false)
}

Expand Down Expand Up @@ -512,8 +515,10 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 {

// NumPending will return the number of pending messages matching the filter subject starting at sequence.
func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) {
ms.mu.RLock()
defer ms.mu.RUnlock()
// This needs to be a write lock, as filteredStateLocked can
// mutate the per-subject state.
ms.mu.Lock()
defer ms.mu.Unlock()

ss := ms.filteredStateLocked(sseq, filter, lastPerSubject)
return ss.Msgs, ms.state.LastSeq
Expand Down Expand Up @@ -880,8 +885,10 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error
var sm *StoreMsg
var ok bool

ms.mu.RLock()
defer ms.mu.RUnlock()
// This needs to be a write lock, as filteredStateLocked can
// mutate the per-subject state.
ms.mu.Lock()
defer ms.mu.Unlock()

if subject == _EMPTY_ || subject == fwcs {
sm, ok = ms.msgs[ms.state.LastSeq]
Expand Down

0 comments on commit 6332512

Please sign in to comment.