Skip to content

Commit

Permalink
[FIXED] Check for checksum violations for all records and before any …
Browse files Browse the repository at this point in the history
…sequence processing. (#4465)

Also small bug fix for leaking fds under certain scenarios during
corruption.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Aug 31, 2023
2 parents 6d6d3cf + a45281d commit d7ea3b9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 26 deletions.
51 changes: 27 additions & 24 deletions server/filestore.go
Expand Up @@ -1032,7 +1032,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
fd = mb.mfd
} else {
fd, err = os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
if err != nil {
if err == nil {
defer fd.Close()
}
}
Expand Down Expand Up @@ -1078,6 +1078,26 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
return gatherLost(lbuf - index), errBadMsg
}

// Check for checksum failures before additional processing.
data := buf[index+msgHdrSize : index+rl]
if hh := mb.hh; hh != nil {
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
copy(mb.lchk[0:], checksum)
}

// Grab our sequence and timestamp.
seq := le.Uint64(hdr[4:])
ts := int64(le.Uint64(hdr[12:]))

Expand Down Expand Up @@ -1114,29 +1134,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
_, deleted = mb.dmap[seq]
}

// Always set last.
mb.last.seq = seq
mb.last.ts = ts

if !deleted {
data := buf[index+msgHdrSize : index+rl]
if hh := mb.hh; hh != nil {
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
copy(mb.lchk[0:], checksum)
}

if firstNeedsSet {
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
}
Expand All @@ -1162,6 +1160,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.fssNeedsWrite = true
}
}

// Always set last
mb.last.seq = seq
mb.last.ts = ts

// Advance to next record.
index += rl
}
Expand Down Expand Up @@ -4646,7 +4649,7 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store
dlen := int(rl) - msgHdrSize
slen := int(le.Uint16(hdr[20:]))
// Simple sanity check.
if dlen < 0 || slen > dlen || int(rl) > len(buf) {
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) {
return nil, errBadMsg
}
data := buf[msgHdrSize : msgHdrSize+dlen]
Expand Down
14 changes: 12 additions & 2 deletions server/filestore_test.go
Expand Up @@ -1278,7 +1278,10 @@ func TestFileStoreBitRot(t *testing.T) {
// Now twiddle some bits.
fs.mu.Lock()
lmb := fs.lmb
contents, _ := os.ReadFile(lmb.mfn)
contents, err := os.ReadFile(lmb.mfn)
require_NoError(t, err)
require_True(t, len(contents) > 0)

var index int
for {
index = rand.Intn(len(contents))
Expand All @@ -1296,6 +1299,10 @@ func TestFileStoreBitRot(t *testing.T) {
if len(ld.Msgs) > 0 {
break
}
// If our bitrot caused us to not be able to recover any messages we can break as well.
if state := fs.State(); state.Msgs == 0 {
break
}
// Fail the test if we have tried the 10 times and still did not
// get any corruption report.
if i == 9 {
Expand All @@ -1314,7 +1321,10 @@ func TestFileStoreBitRot(t *testing.T) {

// checkMsgs will repair the underlying store, so checkMsgs should be clean now.
if ld := fs.checkMsgs(); ld != nil {
t.Fatalf("Expected no errors restoring checked and fixed filestore, got %+v", ld)
// If we have no msgs left this will report the head msgs as lost again.
if state := fs.State(); state.Msgs > 0 {
t.Fatalf("Expected no errors restoring checked and fixed filestore, got %+v", ld)
}
}
})
}
Expand Down
1 change: 1 addition & 0 deletions server/norace_test.go
Expand Up @@ -3809,6 +3809,7 @@ func TestNoRaceJetStreamClusterStreamReset(t *testing.T) {

// Simulate a low level write error on our consumer and make sure we can recover etc.
cl = c.consumerLeader("$G", "TEST", "d1")
require_True(t, cl != nil)
mset, err = cl.GlobalAccount().lookupStream("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down

0 comments on commit d7ea3b9

Please sign in to comment.