Skip to content

Commit

Permalink
[FIXED] Setting initial min on dmap caused subtle bugs with dmap. (#4631
Browse files Browse the repository at this point in the history
)

Under heavy load with max msgs per subject of 1 the dmap, when
considered empty and resetting the initial min, could cause lookup
misses that would lead to excess messages in a stream and longer restore
issues.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Oct 6, 2023
2 parents beee6fc + dd646f6 commit f29c786
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 52 deletions.
81 changes: 35 additions & 46 deletions server/filestore.go
Expand Up @@ -1632,20 +1632,22 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// First let's check the happy path, open the blk file that was the lmb when we created the full state.
// See if we have the last block available.
var matched bool
var mb *msgBlock
if mb = fs.bim[blkIndex]; mb != nil {
if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) {
// If our saved state is past what we see on disk, fallback and rebuild.
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
fs.warn("Stream state detected prior state, could not locate msg block %d", blkIndex)
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
mb := fs.lmb
if mb == nil || mb.index != blkIndex {
fs.warn("Stream state block does not exist or index mismatch")
return errCorruptState
}
if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) {
// If our saved state is past what we see on disk, fallback and rebuild.
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
fs.warn("Stream state detected prior state, could not locate msg block %d", blkIndex)
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
}

// We may need to check other blocks. Even if we matched last checksum we will see if there is another block.
Expand All @@ -1669,7 +1671,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// Check if we have to account for a partial message block.
if !matched && mb != nil && mb.index == nmb.index {
if err := fs.adjustAccounting(mb, nmb); err != nil {
fs.warn("Stream state could not adjust accounting: %v", err)
fs.warn("Stream state could not adjust accounting")
return err
}
}
Expand Down Expand Up @@ -1701,8 +1703,13 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error {
}
nmb.ensurePerSubjectInfoLoaded()

lookupAndAdjust := func(seq uint64) error {
var smv StoreMsg
// Walk all the original mb's sequences that were included in the stream state.
var smv StoreMsg
for seq := mb.first.seq; seq <= mb.last.seq; seq++ {
// If we had already declared it deleted we can move on since you can not undelete.
if mb.dmap.Exists(seq) {
continue
}
// Lookup the message.
sm, err := nmb.cacheLookup(seq, &smv)
if err != nil {
Expand All @@ -1714,29 +1721,10 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error {
if len(sm.subj) > 0 && fs.psim != nil {
fs.removePerSubject(sm.subj)
}
return nil
}

// Walk all the original mb's sequences that were included in the stream state.
for seq := mb.first.seq; seq <= mb.last.seq; seq++ {
// If we had already declared it deleted we can move on since you can not undelete.
if mb.dmap.Exists(seq) {
continue
}
// Lookup the message.
if err := lookupAndAdjust(seq); err != nil {
return err
}
}

// Now check to see if we had a higher first for the recovered state mb vs nmb.
if nmb.first.seq < mb.first.seq {
for seq := nmb.first.seq; seq < mb.first.seq; seq++ {
// Lookup the message.
if err := lookupAndAdjust(seq); err != nil {
return err
}
}
// Now set first for nmb.
nmb.first = mb.first
}
Expand Down Expand Up @@ -3329,7 +3317,9 @@ func (fs *fileStore) removePerSubject(subj string) {
// We do not update sense of fblk here but will do so when we resolve during lookup.
if info, ok := fs.psim[subj]; ok {
info.total--
if info.total == 0 {
if info.total == 1 {
info.fblk = info.lblk
} else if info.total == 0 {
delete(fs.psim, subj)
}
}
Expand Down Expand Up @@ -3499,10 +3489,6 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
}
}
} else if !isEmpty {
if mb.dmap.IsEmpty() {
// Mark initial base for delete set.
mb.dmap.SetInitialMin(mb.first.seq)
}
// Out of order delete.
mb.dmap.Insert(seq)
// Check if <25% utilization and minimum size met.
Expand Down Expand Up @@ -3592,10 +3578,7 @@ func (mb *msgBlock) compact() {
var firstSet bool

isDeleted := func(seq uint64) bool {
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
return true
}
return mb.dmap.Exists(seq)
return seq == 0 || seq&ebit != 0 || seq < mb.first.seq || mb.dmap.Exists(seq)
}

for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
Expand Down Expand Up @@ -3638,6 +3621,12 @@ func (mb *msgBlock) compact() {
index += rl
}

// Handle compression
var err error
if nbuf, err = mb.cmp.Compress(nbuf); err != nil {
return
}

// Check for encryption.
if mb.bek != nil && len(nbuf) > 0 {
// Recreate to reset counter.
Expand All @@ -3652,7 +3641,7 @@ func (mb *msgBlock) compact() {
mb.closeFDsLocked()

// We will write to a new file and mv/rename it in case of failure.
mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
mfn := filepath.Join(mb.fs.fcfg.StoreDir, msgDir, fmt.Sprintf(newScan, mb.index))
if err := os.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
os.Remove(mfn)
return
Expand All @@ -3662,7 +3651,7 @@ func (mb *msgBlock) compact() {
return
}

// Remove index file and wipe delete map, then rebuild.
// Wipe dmap and rebuild here.
mb.dmap.Empty()
mb.rebuildStateLocked()

Expand Down
74 changes: 72 additions & 2 deletions server/filestore_test.go
Expand Up @@ -6136,7 +6136,7 @@ func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) {
require_NoError(t, err)
defer fs.Stop()

// Store 2 more msgs and delete 2 & 4, then another 2 msgs.
// Store 6 more msgs.
fs.StoreMsg("C", nil, msgA)
fs.StoreMsg("D", nil, msgZ)
fs.StoreMsg("E", nil, msgA)
Expand All @@ -6149,7 +6149,6 @@ func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) {

// Put back old stream state.
// This will test that we properly walk multiple blocks past where we snapshotted state.
fs.Stop()
err = os.WriteFile(sfile, buf, defaultFilePerms)
require_NoError(t, err)

Expand All @@ -6165,6 +6164,77 @@ func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) {
})
}

// This tests we can successfully recover without having to rebuild the whole stream from a mid block index.db marker
// when they updated block has a removed entry.
// TODO(dlc) - This test will force a rebuild atm, leaving here for later.
func TestFileStoreFullStateMidBlockPastWAL(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1}

prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
if fcfg.Cipher == NoCipher {
prf = nil
}

fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// This yields an internal record length of 50 bytes. So 2 msgs per blk.
msg := bytes.Repeat([]byte("Z"), 19)

// Store 5 msgs
fs.StoreMsg("A", nil, msg)
fs.StoreMsg("B", nil, msg)
fs.StoreMsg("C", nil, msg)
fs.StoreMsg("D", nil, msg)
fs.StoreMsg("E", nil, msg)
require_Equal(t, fs.numMsgBlocks(), 1)
fs.Stop()

// Grab the state from this stop.
sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile)
buf, err := os.ReadFile(sfile)
require_NoError(t, err)

fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// Store 5 more messages, then remove seq 2, "B".
fs.StoreMsg("F", nil, msg)
fs.StoreMsg("G", nil, msg)
fs.StoreMsg("H", nil, msg)
fs.StoreMsg("I", nil, msg)
fs.StoreMsg("J", nil, msg)
fs.EraseMsg(2)

require_Equal(t, fs.numMsgBlocks(), 1)
state := fs.State()
fs.Stop()

// Put back old stream state.
// This will test that we properly walk multiple blocks past where we snapshotted state.
err = os.WriteFile(sfile, buf, defaultFilePerms)
require_NoError(t, err)

fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

if newState := fs.State(); !reflect.DeepEqual(state, newState) {
t.Fatalf("Restore state does not match:\n%+v\n%+v",
state, newState)
}
})
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
8 changes: 4 additions & 4 deletions server/norace_test.go
Expand Up @@ -8736,10 +8736,10 @@ func TestNoRaceBinaryStreamSnapshotEncodingBasic(t *testing.T) {
ss, err := DecodeStreamState(snap)
require_NoError(t, err)

require_True(t, ss.FirstSeq == 1)
require_True(t, ss.LastSeq == 3000)
require_True(t, ss.Msgs == 1000)
require_True(t, ss.Deleted.NumDeleted() == 2000)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 3000)
require_Equal(t, ss.Msgs, 1000)
require_Equal(t, ss.Deleted.NumDeleted(), 2000)
}

func TestNoRaceFilestoreBinaryStreamSnapshotEncodingLargeGaps(t *testing.T) {
Expand Down

0 comments on commit f29c786

Please sign in to comment.