Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Fix for panic from a bug in selecting a filestore block in new fs code. #4520

Merged
merged 1 commit into from Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion server/filestore.go
Expand Up @@ -2452,7 +2452,10 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)

// See if we need to figure out starting block per sseq.
if sseq > fs.state.FirstSeq {
seqStart, _ = fs.selectMsgBlockWithIndex(sseq)
// This should not, but can return -1, so make sure we check to avoid panic below.
if seqStart, _ = fs.selectMsgBlockWithIndex(sseq); seqStart < 0 {
seqStart = 0
}
}

var tsa, fsa [32]string
Expand Down Expand Up @@ -4738,6 +4741,7 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock {
return mb
}

// Lock should be held.
func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
// Check for out of range.
if seq < fs.state.FirstSeq || seq > fs.state.LastSeq {
Expand Down Expand Up @@ -4765,6 +4769,13 @@ func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
if seq > last {
low = mid + 1
} else if seq < first {
// A message block's first sequence can change here meaning we could find a gap.
// We want to behave like above, which if inclusive (we check at start) should
// always return an index and a valid mb.
// If we have a gap then our seq would be > fs.blks[mid-1].last.seq
if mid == 0 || seq > atomic.LoadUint64(&fs.blks[mid-1].last.seq) {
return mid, mb
}
high = mid - 1
} else {
return mid, mb
Expand Down
59 changes: 59 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5867,6 +5867,65 @@ func TestFileStoreFullStateTestSysRemovals(t *testing.T) {
})
}

func TestFileStoreSelectBlockWithFirstSeqRemovals(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 100
scfg := StreamConfig{
Name: "zzz",
Subjects: []string{"*"},
MaxMsgsPer: 1,
Storage: FileStorage,
}

prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
if fcfg.Cipher == NoCipher {
prf = nil
}

fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// This yields an internal record length of 50 bytes. So 2 msgs per blk.
msgLen := 19
msg := bytes.Repeat([]byte("A"), msgLen)

subjects := "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz+@$^"
// We need over 32 blocks to kick in binary search. So 32*2+1 (65) msgs to get 33 blocks.
for i := 0; i < 32*2+1; i++ {
subj := string(subjects[i])
fs.StoreMsg(subj, nil, msg)
}
require_True(t, fs.numMsgBlocks() == 33)

// Now we want to delete the first msg of each block to move the first sequence.
// Want to do this via system removes, not user initiated moves.
for i := 0; i < len(subjects); i += 2 {
subj := string(subjects[i])
fs.StoreMsg(subj, nil, msg)
}

var ss StreamState
fs.FastState(&ss)

// We want to make sure that select always returns an index and a non-nil mb.
for seq := ss.FirstSeq; seq <= ss.LastSeq; seq++ {
fs.mu.RLock()
index, mb := fs.selectMsgBlockWithIndex(seq)
fs.mu.RUnlock()
require_True(t, index >= 0)
require_True(t, mb != nil)
require_True(t, (seq-1)/2 == uint64(index))
}
})
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down