Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Check for checksum violations for all records and before any sequence processing. #4465

Merged
merged 2 commits into from Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 27 additions & 24 deletions server/filestore.go
Expand Up @@ -1032,7 +1032,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
fd = mb.mfd
} else {
fd, err = os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
if err != nil {
if err == nil {
defer fd.Close()
}
}
Expand Down Expand Up @@ -1078,6 +1078,26 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
return gatherLost(lbuf - index), errBadMsg
}

// Check for checksum failures before additional processing.
data := buf[index+msgHdrSize : index+rl]
if hh := mb.hh; hh != nil {
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
copy(mb.lchk[0:], checksum)
}

// Grab our sequence and timestamp.
seq := le.Uint64(hdr[4:])
ts := int64(le.Uint64(hdr[12:]))

Expand Down Expand Up @@ -1114,29 +1134,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
_, deleted = mb.dmap[seq]
}

// Always set last.
mb.last.seq = seq
mb.last.ts = ts

if !deleted {
data := buf[index+msgHdrSize : index+rl]
if hh := mb.hh; hh != nil {
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
copy(mb.lchk[0:], checksum)
}

if firstNeedsSet {
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
}
Expand All @@ -1162,6 +1160,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.fssNeedsWrite = true
}
}

// Always set last
mb.last.seq = seq
mb.last.ts = ts

// Advance to next record.
index += rl
}
Expand Down Expand Up @@ -4646,7 +4649,7 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store
dlen := int(rl) - msgHdrSize
slen := int(le.Uint16(hdr[20:]))
// Simple sanity check.
if dlen < 0 || slen > dlen || int(rl) > len(buf) {
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) {
return nil, errBadMsg
}
data := buf[msgHdrSize : msgHdrSize+dlen]
Expand Down
14 changes: 12 additions & 2 deletions server/filestore_test.go
Expand Up @@ -1278,7 +1278,10 @@ func TestFileStoreBitRot(t *testing.T) {
// Now twiddle some bits.
fs.mu.Lock()
lmb := fs.lmb
contents, _ := os.ReadFile(lmb.mfn)
contents, err := os.ReadFile(lmb.mfn)
require_NoError(t, err)
require_True(t, len(contents) > 0)

var index int
for {
index = rand.Intn(len(contents))
Expand All @@ -1296,6 +1299,10 @@ func TestFileStoreBitRot(t *testing.T) {
if len(ld.Msgs) > 0 {
break
}
// If our bitrot caused us to not be able to recover any messages we can break as well.
if state := fs.State(); state.Msgs == 0 {
break
}
// Fail the test if we have tried the 10 times and still did not
// get any corruption report.
if i == 9 {
Expand All @@ -1314,7 +1321,10 @@ func TestFileStoreBitRot(t *testing.T) {

// checkMsgs will repair the underlying store, so checkMsgs should be clean now.
if ld := fs.checkMsgs(); ld != nil {
t.Fatalf("Expected no errors restoring checked and fixed filestore, got %+v", ld)
// If we have no msgs left this will report the head msgs as lost again.
if state := fs.State(); state.Msgs > 0 {
t.Fatalf("Expected no errors restoring checked and fixed filestore, got %+v", ld)
}
}
})
}
Expand Down