Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Stream recovery with corrupt msg block with sequence gaps. #4344

Merged
merged 2 commits into from Jul 28, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
101 changes: 78 additions & 23 deletions server/filestore.go
Expand Up @@ -971,6 +971,10 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
startLastSeq := mb.last.seq

// Remove the .fss file and clear any cache we have set.
mb.clearCacheAndOffset()
mb.removePerSubjectInfoLocked()

buf, err := mb.loadBlock(nil)
if err != nil || len(buf) == 0 {
var ld *LostStreamData
Expand All @@ -996,9 +1000,6 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.last.seq, mb.last.ts = 0, 0
firstNeedsSet := true

// Remove the .fss file from disk.
mb.removePerSubjectInfoLocked()

// Check if we need to decrypt.
if mb.bek != nil && len(buf) > 0 {
// Recreate to reset counter.
Expand Down Expand Up @@ -1070,12 +1071,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
rl &^= hbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || rl > rlBadThresh {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}

if index+rl > lbuf {
if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
Expand All @@ -1091,15 +1087,17 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
addToDmap(seq)
}
index += rl
mb.last.seq = seq
mb.last.ts = ts
if seq >= mb.first.seq {
mb.last.seq = seq
mb.last.ts = ts
}
continue
}

// This is for when we have index info that adjusts for deleted messages
// at the head. So the first.seq will be already set here. If this is larger
// replace what we have with this seq.
if firstNeedsSet && seq > mb.first.seq {
if firstNeedsSet && seq >= mb.first.seq {
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
}

Expand Down Expand Up @@ -1165,6 +1163,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.last.seq = mb.first.seq - 1
}

// Update our fss file if needed.
if len(mb.fss) > 0 {
mb.writePerSubjectInfo()
}

return nil, nil
}

Expand Down Expand Up @@ -2598,14 +2601,42 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
fs.scb = nil
defer func() { fs.scb = cb }()

var numMsgs uint64

// collect all that are not correct.
needAttention := make(map[string]*psi)
for subj, psi := range fs.psim {
numMsgs += psi.total
if psi.total > maxMsgsPer {
needAttention[subj] = psi
}
}

// We had an issue with a use case where psim (and hence fss) were correct but idx was not and was not properly being caught.
// So do a quick sanity check here. If we detect a skew do a rebuild then re-check.
if numMsgs != fs.state.Msgs {
// Clear any global subject state.
fs.psim = make(map[string]*psi)
for _, mb := range fs.blks {
mb.removeIndexFile()
ld, err := mb.rebuildState()
mb.writeIndexInfo()
if err != nil && ld != nil {
fs.addLostData(ld)
}
fs.populateGlobalPerSubjectInfo(mb)
}
// Rebuild fs state too.
fs.rebuildStateLocked(nil)
// Need to redo blocks that need attention.
needAttention = make(map[string]*psi)
for subj, psi := range fs.psim {
if psi.total > maxMsgsPer {
needAttention[subj] = psi
}
}
}

// Collect all the msgBlks we alter.
blks := make(map[*msgBlock]struct{})

Expand Down Expand Up @@ -3050,8 +3081,7 @@ func (mb *msgBlock) compact() {
return
}

// Close cache and index file and wipe delete map, then rebuild.
mb.clearCacheAndOffset()
// Remove index file and wipe delete map, then rebuild.
mb.removeIndexFileLocked()
mb.deleteDmap()
mb.rebuildStateLocked()
Expand All @@ -3077,6 +3107,11 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
bi := mb.cache.idx[slot]
ri, hashChecked := (bi &^ hbit), (bi&hbit) != 0

// If this is a deleted slot return here.
if bi == dbit {
return 0, 0, false, errDeletedMsg
}

// Determine record length
var rl uint32
if len(mb.cache.idx) > slot+1 {
Expand Down Expand Up @@ -4022,7 +4057,7 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock {
func (mb *msgBlock) indexCacheBuf(buf []byte) error {
var le = binary.LittleEndian

var fseq uint64
var fseq, pseq uint64
var idx []uint32
var index uint32

Expand Down Expand Up @@ -4055,23 +4090,39 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
dlen := int(rl) - msgHdrSize

// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || index+rl > lbuf || rl > 32*1024*1024 {
if dlen < 0 || int(slen) > (dlen-8) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth commenting what the 8 is for, otherwise it's just a magic number

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add in const.

// This means something is off.
// TODO(dlc) - Add into bad list?
return errCorruptState
}

// Clear erase bit.
seq = seq &^ ebit
// Adjust if we guessed wrong.
if seq != 0 && seq < fseq {
fseq = seq
}

// We defer checksum checks to individual msg cache lookups to amortorize costs and
// not introduce latency for first message from a newly loaded block.
idx = append(idx, index)
mb.cache.lrl = uint32(rl)
index += mb.cache.lrl
if seq >= mb.first.seq {
// Track that we do not have holes.
// Not expected but did see it in the field.
if pseq > 0 && seq != pseq+1 {
if mb.dmap == nil {
mb.dmap = make(map[uint64]struct{})
}
for dseq := pseq + 1; dseq < seq; dseq++ {
idx = append(idx, dbit)
mb.dmap[dseq] = struct{}{}
}
}
pseq = seq

idx = append(idx, index)
mb.cache.lrl = uint32(rl)
// Adjust if we guessed wrong.
if seq != 0 && seq < fseq {
fseq = seq
}
}
index += rl
}
mb.cache.buf = buf
mb.cache.idx = idx
Expand Down Expand Up @@ -4407,6 +4458,9 @@ const hbit = 1 << 31
// Used for marking erased messages sequences.
const ebit = 1 << 63

// Used to mark a bad index as deleted.
const dbit = 1 << 30

// Will do a lookup from cache.
// Lock should be held.
func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
Expand All @@ -4417,6 +4471,7 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
// If we have a delete map check it.
if mb.dmap != nil {
if _, ok := mb.dmap[seq]; ok {
mb.llts = time.Now().UnixNano()
return nil, errDeletedMsg
}
}
Expand Down