Skip to content

Commit

Permalink
Check for checksum violations for all records and before sequence pro…
Browse files Browse the repository at this point in the history
…cessing.

Also fix for bitrot test and a small bug fix for a leaking fd.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Aug 31, 2023
1 parent 2834142 commit 72510c8
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 26 deletions.
52 changes: 28 additions & 24 deletions server/filestore.go
Expand Up @@ -1032,7 +1032,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
fd = mb.mfd
} else {
fd, err = os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
if err != nil {
if err == nil {
defer fd.Close()
}
}
Expand Down Expand Up @@ -1078,6 +1078,27 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
return gatherLost(lbuf - index), errBadMsg
}

// Check for checksum failures before additional processing.
data := buf[index+msgHdrSize : index+rl]
if hh := mb.hh; hh != nil {
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
// fmt.Printf("FAILED CHECKSUM!!!\n\n\n")
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
copy(mb.lchk[0:], checksum)
}

// Grab our sequence and timestamp.
seq := le.Uint64(hdr[4:])
ts := int64(le.Uint64(hdr[12:]))

Expand Down Expand Up @@ -1114,29 +1135,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
_, deleted = mb.dmap[seq]
}

// Always set last.
mb.last.seq = seq
mb.last.ts = ts

if !deleted {
data := buf[index+msgHdrSize : index+rl]
if hh := mb.hh; hh != nil {
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
copy(mb.lchk[0:], checksum)
}

if firstNeedsSet {
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
}
Expand All @@ -1162,6 +1161,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.fssNeedsWrite = true
}
}

// Always set last
mb.last.seq = seq
mb.last.ts = ts

// Advance to next record.
index += rl
}
Expand Down Expand Up @@ -4646,7 +4650,7 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store
dlen := int(rl) - msgHdrSize
slen := int(le.Uint16(hdr[20:]))
// Simple sanity check.
if dlen < 0 || slen > dlen || int(rl) > len(buf) {
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) {
return nil, errBadMsg
}
data := buf[msgHdrSize : msgHdrSize+dlen]
Expand Down
14 changes: 12 additions & 2 deletions server/filestore_test.go
Expand Up @@ -1278,7 +1278,10 @@ func TestFileStoreBitRot(t *testing.T) {
// Now twiddle some bits.
fs.mu.Lock()
lmb := fs.lmb
contents, _ := os.ReadFile(lmb.mfn)
contents, err := os.ReadFile(lmb.mfn)
require_NoError(t, err)
require_True(t, len(contents) > 0)

var index int
for {
index = rand.Intn(len(contents))
Expand All @@ -1296,6 +1299,10 @@ func TestFileStoreBitRot(t *testing.T) {
if len(ld.Msgs) > 0 {
break
}
// If our bitrot caused us to not be able to recover any messages we can break as well.
if state := fs.State(); state.Msgs == 0 {
break
}
// Fail the test if we have tried the 10 times and still did not
// get any corruption report.
if i == 9 {
Expand All @@ -1314,7 +1321,10 @@ func TestFileStoreBitRot(t *testing.T) {

// checkMsgs will repair the underlying store, so checkMsgs should be clean now.
if ld := fs.checkMsgs(); ld != nil {
t.Fatalf("Expected no errors restoring checked and fixed filestore, got %+v", ld)
// If we have no msgs left this will report the head msgs as lost again.
if state := fs.State(); state.Msgs > 0 {
t.Fatalf("Expected no errors restoring checked and fixed filestore, got %+v", ld)
}
}
})
}
Expand Down

0 comments on commit 72510c8

Please sign in to comment.