Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Setting initial min on dmap caused subtle bugs with dmap. #4631

Merged
merged 1 commit into from Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
81 changes: 35 additions & 46 deletions server/filestore.go
Expand Up @@ -1632,20 +1632,22 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// First let's check the happy path, open the blk file that was the lmb when we created the full state.
// See if we have the last block available.
var matched bool
var mb *msgBlock
if mb = fs.bim[blkIndex]; mb != nil {
if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) {
// If our saved state is past what we see on disk, fallback and rebuild.
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
fs.warn("Stream state detected prior state, could not locate msg block %d", blkIndex)
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
mb := fs.lmb
if mb == nil || mb.index != blkIndex {
fs.warn("Stream state block does not exist or index mismatch")
return errCorruptState
}
if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) {
// If our saved state is past what we see on disk, fallback and rebuild.
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
fs.warn("Stream state detected prior state, could not locate msg block %d", blkIndex)
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
}

// We may need to check other blocks. Even if we matched last checksum we will see if there is another block.
Expand All @@ -1669,7 +1671,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// Check if we have to account for a partial message block.
if !matched && mb != nil && mb.index == nmb.index {
if err := fs.adjustAccounting(mb, nmb); err != nil {
fs.warn("Stream state could not adjust accounting: %v", err)
fs.warn("Stream state could not adjust accounting")
return err
}
}
Expand Down Expand Up @@ -1701,8 +1703,13 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error {
}
nmb.ensurePerSubjectInfoLoaded()

lookupAndAdjust := func(seq uint64) error {
var smv StoreMsg
// Walk all the original mb's sequences that were included in the stream state.
var smv StoreMsg
for seq := mb.first.seq; seq <= mb.last.seq; seq++ {
// If we had already declared it deleted we can move on since you can not undelete.
if mb.dmap.Exists(seq) {
continue
}
// Lookup the message.
sm, err := nmb.cacheLookup(seq, &smv)
if err != nil {
Expand All @@ -1714,29 +1721,10 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error {
if len(sm.subj) > 0 && fs.psim != nil {
fs.removePerSubject(sm.subj)
}
return nil
}

// Walk all the original mb's sequences that were included in the stream state.
for seq := mb.first.seq; seq <= mb.last.seq; seq++ {
// If we had already declared it deleted we can move on since you can not undelete.
if mb.dmap.Exists(seq) {
continue
}
// Lookup the message.
if err := lookupAndAdjust(seq); err != nil {
return err
}
}

// Now check to see if we had a higher first for the recovered state mb vs nmb.
if nmb.first.seq < mb.first.seq {
for seq := nmb.first.seq; seq < mb.first.seq; seq++ {
// Lookup the message.
if err := lookupAndAdjust(seq); err != nil {
return err
}
}
// Now set first for nmb.
nmb.first = mb.first
}
Expand Down Expand Up @@ -3329,7 +3317,9 @@ func (fs *fileStore) removePerSubject(subj string) {
// We do not update sense of fblk here but will do so when we resolve during lookup.
if info, ok := fs.psim[subj]; ok {
info.total--
if info.total == 0 {
if info.total == 1 {
info.fblk = info.lblk
} else if info.total == 0 {
delete(fs.psim, subj)
}
}
Expand Down Expand Up @@ -3499,10 +3489,6 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
}
}
} else if !isEmpty {
if mb.dmap.IsEmpty() {
// Mark initial base for delete set.
mb.dmap.SetInitialMin(mb.first.seq)
}
// Out of order delete.
mb.dmap.Insert(seq)
// Check if <25% utilization and minimum size met.
Expand Down Expand Up @@ -3592,10 +3578,7 @@ func (mb *msgBlock) compact() {
var firstSet bool

isDeleted := func(seq uint64) bool {
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
return true
}
return mb.dmap.Exists(seq)
return seq == 0 || seq&ebit != 0 || seq < mb.first.seq || mb.dmap.Exists(seq)
}

for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
Expand Down Expand Up @@ -3638,6 +3621,12 @@ func (mb *msgBlock) compact() {
index += rl
}

// Handle compression
var err error
if nbuf, err = mb.cmp.Compress(nbuf); err != nil {
return
}

// Check for encryption.
if mb.bek != nil && len(nbuf) > 0 {
// Recreate to reset counter.
Expand All @@ -3652,7 +3641,7 @@ func (mb *msgBlock) compact() {
mb.closeFDsLocked()

// We will write to a new file and mv/rename it in case of failure.
mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
mfn := filepath.Join(mb.fs.fcfg.StoreDir, msgDir, fmt.Sprintf(newScan, mb.index))
if err := os.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
os.Remove(mfn)
return
Expand All @@ -3662,7 +3651,7 @@ func (mb *msgBlock) compact() {
return
}

// Remove index file and wipe delete map, then rebuild.
// Wipe dmap and rebuild here.
mb.dmap.Empty()
mb.rebuildStateLocked()

Expand Down
74 changes: 72 additions & 2 deletions server/filestore_test.go
Expand Up @@ -6136,7 +6136,7 @@ func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) {
require_NoError(t, err)
defer fs.Stop()

// Store 2 more msgs and delete 2 & 4, then another 2 msgs.
// Store 6 more msgs.
fs.StoreMsg("C", nil, msgA)
fs.StoreMsg("D", nil, msgZ)
fs.StoreMsg("E", nil, msgA)
Expand All @@ -6149,7 +6149,6 @@ func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) {

// Put back old stream state.
// This will test that we properly walk multiple blocks past where we snapshotted state.
fs.Stop()
err = os.WriteFile(sfile, buf, defaultFilePerms)
require_NoError(t, err)

Expand All @@ -6165,6 +6164,77 @@ func TestFileStoreFullStateMultiBlockPastWAL(t *testing.T) {
})
}

// This tests we can successfully recover without having to rebuild the whole stream from a mid block index.db marker
// when they updated block has a removed entry.
// TODO(dlc) - This test will force a rebuild atm, leaving here for later.
func TestFileStoreFullStateMidBlockPastWAL(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1}

prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
if fcfg.Cipher == NoCipher {
prf = nil
}

fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// This yields an internal record length of 50 bytes. So 2 msgs per blk.
msg := bytes.Repeat([]byte("Z"), 19)

// Store 5 msgs
fs.StoreMsg("A", nil, msg)
fs.StoreMsg("B", nil, msg)
fs.StoreMsg("C", nil, msg)
fs.StoreMsg("D", nil, msg)
fs.StoreMsg("E", nil, msg)
require_Equal(t, fs.numMsgBlocks(), 1)
fs.Stop()

// Grab the state from this stop.
sfile := filepath.Join(fcfg.StoreDir, msgDir, streamStreamStateFile)
buf, err := os.ReadFile(sfile)
require_NoError(t, err)

fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// Store 5 more messages, then remove seq 2, "B".
fs.StoreMsg("F", nil, msg)
fs.StoreMsg("G", nil, msg)
fs.StoreMsg("H", nil, msg)
fs.StoreMsg("I", nil, msg)
fs.StoreMsg("J", nil, msg)
fs.EraseMsg(2)

require_Equal(t, fs.numMsgBlocks(), 1)
state := fs.State()
fs.Stop()

// Put back old stream state.
// This will test that we properly walk multiple blocks past where we snapshotted state.
err = os.WriteFile(sfile, buf, defaultFilePerms)
require_NoError(t, err)

fs, err = newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

if newState := fs.State(); !reflect.DeepEqual(state, newState) {
t.Fatalf("Restore state does not match:\n%+v\n%+v",
state, newState)
}
})
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
8 changes: 4 additions & 4 deletions server/norace_test.go
Expand Up @@ -8736,10 +8736,10 @@ func TestNoRaceBinaryStreamSnapshotEncodingBasic(t *testing.T) {
ss, err := DecodeStreamState(snap)
require_NoError(t, err)

require_True(t, ss.FirstSeq == 1)
require_True(t, ss.LastSeq == 3000)
require_True(t, ss.Msgs == 1000)
require_True(t, ss.Deleted.NumDeleted() == 2000)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 3000)
require_Equal(t, ss.Msgs, 1000)
require_Equal(t, ss.Deleted.NumDeleted(), 2000)
}

func TestNoRaceFilestoreBinaryStreamSnapshotEncodingLargeGaps(t *testing.T) {
Expand Down