Skip to content

Commit

Permalink
Fix for a call into fs.recalculateFirstForSubj() from fs.recalculateF…
Browse files Browse the repository at this point in the history
…irstForSubj() that did not lock the mb properly.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 13, 2023
1 parent 528ee14 commit 22be518
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
7 changes: 4 additions & 3 deletions server/filestore.go
Expand Up @@ -2571,9 +2571,7 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
mb.mu.Unlock()
return 0, err
}
ss := mb.fss[subj]
mb.mu.Unlock()
if ss != nil {
if ss := mb.fss[subj]; ss != nil {
// Adjust first if it was not where we thought it should be.
if i != start {
if info, ok := fs.psim[subj]; ok {
Expand All @@ -2583,8 +2581,10 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
return ss.First, nil
}
mb.mu.Unlock()
}
return 0, nil
}
Expand Down Expand Up @@ -5957,6 +5957,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
return
}
}

// Mark first as updated.
ss.firstNeedsUpdate = false
startSeq++
Expand Down
55 changes: 55 additions & 0 deletions server/filestore_test.go
Expand Up @@ -31,6 +31,7 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -5823,3 +5824,57 @@ func TestFileStoreErrPartialLoadOnSyncClose(t *testing.T) {
_, err = fs.LoadMsg(1, nil)
require_NoError(t, err)
}

// https://github.com/nats-io/nats-server/issues/4529
// Run this wuth --race and you will see the unlocked access that probably caused this.
func TestFileStoreRecalcFirstSequenceBug(t *testing.T) {
fcfg := FileStoreConfig{StoreDir: t.TempDir()}
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"*"}, MaxMsgsPer: 2, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("A"), 22)

for _, subj := range []string{"A", "A", "B", "B"} {
fs.StoreMsg(subj, nil, msg)
}
// Make sure the buffer is cleared.
clearLMBCache := func() {
fs.mu.RLock()
mb := fs.lmb
fs.mu.RUnlock()
mb.mu.Lock()
mb.clearCacheAndOffset()
mb.mu.Unlock()
}

clearLMBCache()

// Do first here.
fs.StoreMsg("A", nil, msg)

var wg sync.WaitGroup
start := make(chan bool)

wg.Add(1)
go func() {
defer wg.Done()
<-start
for i := 0; i < 1_000; i++ {
fs.LoadLastMsg("A", nil)
clearLMBCache()
}
}()

wg.Add(1)
go func() {
defer wg.Done()
<-start
for i := 0; i < 1_000; i++ {
fs.StoreMsg("A", nil, msg)
}
}()

close(start)
wg.Wait()
}

0 comments on commit 22be518

Please sign in to comment.