Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Allow 2.10 tombstones to be skipped and allow us to recover on downgrade #4452

Merged
merged 1 commit into from Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 44 additions & 18 deletions server/filestore.go
Expand Up @@ -1081,6 +1081,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
seq := le.Uint64(hdr[4:])
ts := int64(le.Uint64(hdr[12:]))

// Check if this is a delete tombstone.
if seq&tbit != 0 {
index += rl
continue
}

// This is an old erased message, or a new one that we can track.
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
seq = seq &^ ebit
Expand Down Expand Up @@ -2392,6 +2398,12 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
}
}
}
} else if mb := fs.selectMsgBlock(fseq); mb != nil {
// If we are here we could not remove fseq from above, so rebuild.
var ld *LostStreamData
if ld, _ = mb.rebuildState(); ld != nil {
fs.rebuildStateLocked(ld)
}
}
}

Expand Down Expand Up @@ -3048,7 +3060,8 @@ func (mb *msgBlock) compact() {
if !isDeleted(seq) {
// Normal message here.
nbuf = append(nbuf, buf[index:index+rl]...)
if !firstSet {
// Do not set based on tombstone.
if !firstSet && seq&tbit == 0 {
firstSet = true
mb.first.seq = seq
}
Expand Down Expand Up @@ -3811,20 +3824,25 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
// Update write through cache.
// Write to msg record.
mb.cache.buf = append(mb.cache.buf, checksum...)
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
mb.cache.lrl = uint32(rl)
if mb.cache.fseq == 0 {
mb.cache.fseq = seq
}

// Set cache timestamp for last store.
mb.lwts = ts
// Decide if we write index info if flushing in place.
writeIndex := ts-mb.lwits > wiThresh

// Accounting
mb.updateAccounting(seq, ts, rl)
// Only update index and do accounting if not a delete tombstone.
if seq&tbit == 0 {
// Strip ebit if set.
seq = seq &^ ebit
if mb.cache.fseq == 0 {
mb.cache.fseq = seq
}
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
// Accounting
mb.updateAccounting(seq, ts, rl)
}

fch, werr := mb.fch, mb.werr

Expand Down Expand Up @@ -3927,7 +3945,7 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) {
seq = seq &^ ebit
}

if mb.first.seq == 0 || mb.first.ts == 0 {
if mb.first.seq == 0 || mb.first.ts == 0 && seq >= mb.first.seq {
mb.first.seq = seq
mb.first.ts = ts
}
Expand Down Expand Up @@ -4112,6 +4130,12 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
return errCorruptState
}

// Check for tombstones which we can skip in terms of indexing.
if seq&tbit != 0 {
index += rl
continue
}

// Clear erase bit.
seq = seq &^ ebit

Expand Down Expand Up @@ -4467,15 +4491,17 @@ var (
errNoMainKey = errors.New("encrypted store encountered with no main key")
)

// Used for marking messages that have had their checksums checked.
// Used to signal a message record with headers.
const hbit = 1 << 31

// Used for marking erased messages sequences.
const ebit = 1 << 63

// Used to mark a bad index as deleted.
const dbit = 1 << 30
const (
// Used for marking messages that have had their checksums checked.
// Used to signal a message record with headers.
hbit = 1 << 31
// Used for marking erased messages sequences.
ebit = 1 << 63
// Used for marking tombstone sequences.
tbit = 1 << 62
// Used to mark a bad index as deleted.
dbit = 1 << 30
)

// Will do a lookup from cache.
// Lock should be held.
Expand Down
96 changes: 95 additions & 1 deletion server/filestore_test.go
Expand Up @@ -5501,6 +5501,7 @@ func TestFileStoreNumPendingLargeNumBlks(t *testing.T) {
}
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"zzz"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

subj, msg := "zzz", bytes.Repeat([]byte("X"), 100)
numMsgs := 10_000
Expand Down Expand Up @@ -5554,6 +5555,7 @@ func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) {
prf,
)
require_NoError(t, err)
defer fs.Stop()

subj, msg := "zzz", bytes.Repeat([]byte("X"), 100)
numMsgs := 100
Expand All @@ -5572,9 +5574,10 @@ func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) {
require_Error(t, err, errNoMainKey)
}

func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
func TestFileStoreRecalculateFirstForSubjBug(t *testing.T) {
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

fs.StoreMsg("foo", nil, nil) // 1
fs.StoreMsg("bar", nil, nil) // 2
Expand Down Expand Up @@ -5607,6 +5610,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) {
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("A"), 19)
for i := 0; i < 5; i++ {
Expand All @@ -5623,3 +5627,93 @@ func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) {
require_NoError(t, err)
require_True(t, n == 3)
}

// This is for 2.10 delete tombstones and backward compatibility if a user downgrades to 2.9.x
// TODO(dlc) - Can remove once merged into 2.10 codebase.
func TestFileStoreTombstoneBackwardCompatibility(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// We will test scenarios where tombstones are embedded in a filestore from a 2.10 system.
msgA := bytes.Repeat([]byte("A"), 22)
msgZ := bytes.Repeat([]byte("Z"), 22)

fs.StoreMsg("A", nil, msgA)
fs.StoreMsg("B", nil, msgZ)

mb := fs.getFirstBlock()
require_True(t, mb != nil)

// >= 2.10 tombstone
mb.writeMsgRecord(emptyRecordLen, 2|tbit, _EMPTY_, nil, nil, time.Now().UnixNano(), true)

// Put a real message behind it.
fs.StoreMsg("C", nil, msgA)

checkState := func() {
state := fs.State()
require_True(t, state.Msgs == 3)
require_True(t, state.FirstSeq == 1)
require_True(t, state.LastSeq == 3)
require_True(t, state.NumSubjects == 3)

sm, err := fs.LoadMsg(2, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgZ))
require_True(t, sm.subj == "B")

sm, err = fs.LoadMsg(3, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgA))
require_True(t, sm.subj == "C")
}

checkState()
fs.Stop()

// Make sure we are good on recreate.
fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

checkState()

// Now we will purge, place tombstone first, then add messages and check.
_, err = fs.Purge()
require_NoError(t, err)

// >= 2.10 tombstone
mb.writeMsgRecord(emptyRecordLen, 22|tbit, _EMPTY_, nil, nil, time.Now().UnixNano(), true)

fs.StoreMsg("A", nil, msgA) // seq 4
fs.StoreMsg("B", nil, msgZ) // seq 5

checkPurgeState := func() {
state := fs.State()
require_True(t, state.Msgs == 2)
require_True(t, state.FirstSeq == 4)
require_True(t, state.LastSeq == 5)
require_True(t, state.NumSubjects == 2)

sm, err := fs.LoadMsg(4, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgA))
require_True(t, sm.subj == "A")

sm, err = fs.LoadMsg(5, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgZ))
require_True(t, sm.subj == "B")
}

checkPurgeState()

// Make sure we are good on recreate.
fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

checkPurgeState()
}