Skip to content

Commit

Permalink
Fix for panic from a bug in selecting a block and an index when num b…
Browse files Browse the repository at this point in the history
…locks > 32 and we used new binary search in NumPending().

The reason would be that we were not accounting for gaps as mb.first.seq can move. The behavior should always return a valid index and mb if seq is inclusive of range from first to last.
The panic could orphan held locks for filestore, consumer and possibly stream.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 12, 2023
1 parent f18e06b commit 491f833
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
13 changes: 12 additions & 1 deletion server/filestore.go
Expand Up @@ -2452,7 +2452,10 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)

// See if we need to figure out starting block per sseq.
if sseq > fs.state.FirstSeq {
seqStart, _ = fs.selectMsgBlockWithIndex(sseq)
// This should not, but can return -1, so make sure we check to avoid panic below.
if seqStart, _ = fs.selectMsgBlockWithIndex(sseq); seqStart < 0 {
seqStart = 0
}
}

var tsa, fsa [32]string
Expand Down Expand Up @@ -4738,6 +4741,7 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock {
return mb
}

// Lock should be held.
func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
// Check for out of range.
if seq < fs.state.FirstSeq || seq > fs.state.LastSeq {
Expand Down Expand Up @@ -4765,6 +4769,13 @@ func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
if seq > last {
low = mid + 1
} else if seq < first {
// A message block's first sequence can change here meaning we could find a gap.
// We want to behave like above, which if inclusive (we check at start) should
// awlays return an index and a valid mb.
// If we have a gap then our seq would be > fs.blks[mid-1].last.seq
if mid == 0 || seq > atomic.LoadUint64(&fs.blks[mid-1].last.seq) {
return mid, mb
}
high = mid - 1
} else {
return mid, mb
Expand Down
59 changes: 59 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5867,6 +5867,65 @@ func TestFileStoreFullStateTestSysRemovals(t *testing.T) {
})
}

func TestFileStoreSelectBlockWithFirstSeqRemovals(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 100
scfg := StreamConfig{
Name: "zzz",
Subjects: []string{"*"},
MaxMsgsPer: 1,
Storage: FileStorage,
}

prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
if fcfg.Cipher == NoCipher {
prf = nil
}

fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// This yields an internal record length of 50 bytes. So 2 msgs per blk.
msgLen := 19
msg := bytes.Repeat([]byte("A"), msgLen)

subjects := "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz+@$^"
// We need over 32 blocks to kick in binary search. So 32*2+1 (65) msgs to get 33 blocks.
for i := 0; i < 32*2+1; i++ {
subj := string(subjects[i])
fs.StoreMsg(subj, nil, msg)
}
require_True(t, fs.numMsgBlocks() == 33)

// Now we want to delete the first msg of each block to move the first sequence.
// Want to do this via system removes, not user initiated moves.
for i := 0; i < len(subjects); i += 2 {
subj := string(subjects[i])
fs.StoreMsg(subj, nil, msg)
}

var ss StreamState
fs.FastState(&ss)

// We want to make sure that select always returns an index and a non-nil mb.
for seq := ss.FirstSeq; seq <= ss.LastSeq; seq++ {
fs.mu.RLock()
index, mb := fs.selectMsgBlockWithIndex(seq)
fs.mu.RUnlock()
require_True(t, index >= 0)
require_True(t, mb != nil)
require_True(t, (seq-1)/2 == uint64(index))
}
})
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 491f833

Please sign in to comment.