Skip to content

Commit

Permalink
[FIXED] Fix for panic from a bug in selecting a filestore block in ne…
Browse files Browse the repository at this point in the history
…w fs code. (#4520)

When num blocks > 32 and we used new binary search in NumPending() we
could return -1, nil. If sequence is inclusive this should always return
valid index and mb.

The reason we could return -1 would be that we were not accounting for
gaps as mb.first.seq can move ahead as first is removed. The panic could
orphan held locks for filestore, consumer and possibly stream which
would lock up a system, leading to memory growth and unstable behaviors.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 12, 2023
2 parents f18e06b + 244ff44 commit a345a40
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
13 changes: 12 additions & 1 deletion server/filestore.go
Expand Up @@ -2452,7 +2452,10 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)

// See if we need to figure out starting block per sseq.
if sseq > fs.state.FirstSeq {
seqStart, _ = fs.selectMsgBlockWithIndex(sseq)
// This should not, but can return -1, so make sure we check to avoid panic below.
if seqStart, _ = fs.selectMsgBlockWithIndex(sseq); seqStart < 0 {
seqStart = 0
}
}

var tsa, fsa [32]string
Expand Down Expand Up @@ -4738,6 +4741,7 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock {
return mb
}

// Lock should be held.
func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
// Check for out of range.
if seq < fs.state.FirstSeq || seq > fs.state.LastSeq {
Expand Down Expand Up @@ -4765,6 +4769,13 @@ func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
if seq > last {
low = mid + 1
} else if seq < first {
// A message block's first sequence can change here meaning we could find a gap.
// We want to behave like above, which if inclusive (we check at start) should
// always return an index and a valid mb.
// If we have a gap then our seq would be > fs.blks[mid-1].last.seq
if mid == 0 || seq > atomic.LoadUint64(&fs.blks[mid-1].last.seq) {
return mid, mb
}
high = mid - 1
} else {
return mid, mb
Expand Down
59 changes: 59 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5867,6 +5867,65 @@ func TestFileStoreFullStateTestSysRemovals(t *testing.T) {
})
}

func TestFileStoreSelectBlockWithFirstSeqRemovals(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 100
scfg := StreamConfig{
Name: "zzz",
Subjects: []string{"*"},
MaxMsgsPer: 1,
Storage: FileStorage,
}

prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
if fcfg.Cipher == NoCipher {
prf = nil
}

fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// This yields an internal record length of 50 bytes. So 2 msgs per blk.
msgLen := 19
msg := bytes.Repeat([]byte("A"), msgLen)

subjects := "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz+@$^"
// We need over 32 blocks to kick in binary search. So 32*2+1 (65) msgs to get 33 blocks.
for i := 0; i < 32*2+1; i++ {
subj := string(subjects[i])
fs.StoreMsg(subj, nil, msg)
}
require_True(t, fs.numMsgBlocks() == 33)

// Now we want to delete the first msg of each block to move the first sequence.
// Want to do this via system removes, not user initiated moves.
for i := 0; i < len(subjects); i += 2 {
subj := string(subjects[i])
fs.StoreMsg(subj, nil, msg)
}

var ss StreamState
fs.FastState(&ss)

// We want to make sure that select always returns an index and a non-nil mb.
for seq := ss.FirstSeq; seq <= ss.LastSeq; seq++ {
fs.mu.RLock()
index, mb := fs.selectMsgBlockWithIndex(seq)
fs.mu.RUnlock()
require_True(t, index >= 0)
require_True(t, mb != nil)
require_True(t, (seq-1)/2 == uint64(index))
}
})
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit a345a40

Please sign in to comment.