Skip to content

Commit

Permalink
Use write lock in memstore.LoadNextMsg
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Sep 18, 2023
1 parent 6f38056 commit 046d0b8
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions server/memstore.go
Expand Up @@ -913,8 +913,8 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error
// LoadNextMsg will find the next message matching the filter subject starting at the start sequence.
// The filter subject can be a wildcard.
func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
ms.mu.Lock()
defer ms.mu.Unlock()

if start < ms.state.FirstSeq {
start = ms.state.FirstSeq
Expand Down Expand Up @@ -952,7 +952,11 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
continue
}
if ss.firstNeedsUpdate {
ms.mu.RUnlock()
ms.mu.Lock()
ms.recalculateFirstForSubj(subj, ss.First, ss)
ms.mu.Unlock()
ms.mu.RLock()
}
if ss.First < fseq {
fseq = ss.First
Expand Down Expand Up @@ -1063,6 +1067,7 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
}

// Will recalulate the first sequence for this subject in this block.
// Lock should be held.
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
for tseq := startSeq + 1; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
Expand Down

0 comments on commit 046d0b8

Please sign in to comment.