Skip to content

Commit

Permalink
[IMPROVED] Allow 2.10 tombstones to be skipped and allow us to recove…
Browse files Browse the repository at this point in the history
…r on downgrade (#4452)

Also fixed small bug that could set bad first sequence for subject
tracking info.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Aug 30, 2023
2 parents d6e7106 + 8841432 commit 774987c
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 19 deletions.
62 changes: 44 additions & 18 deletions server/filestore.go
Expand Up @@ -1081,6 +1081,12 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
seq := le.Uint64(hdr[4:])
ts := int64(le.Uint64(hdr[12:]))

// Check if this is a delete tombstone.
if seq&tbit != 0 {
index += rl
continue
}

// This is an old erased message, or a new one that we can track.
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
seq = seq &^ ebit
Expand Down Expand Up @@ -2392,6 +2398,12 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
}
}
}
} else if mb := fs.selectMsgBlock(fseq); mb != nil {
// If we are here we could not remove fseq from above, so rebuild.
var ld *LostStreamData
if ld, _ = mb.rebuildState(); ld != nil {
fs.rebuildStateLocked(ld)
}
}
}

Expand Down Expand Up @@ -3048,7 +3060,8 @@ func (mb *msgBlock) compact() {
if !isDeleted(seq) {
// Normal message here.
nbuf = append(nbuf, buf[index:index+rl]...)
if !firstSet {
// Do not set based on tombstone.
if !firstSet && seq&tbit == 0 {
firstSet = true
mb.first.seq = seq
}
Expand Down Expand Up @@ -3811,20 +3824,25 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
// Update write through cache.
// Write to msg record.
mb.cache.buf = append(mb.cache.buf, checksum...)
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
mb.cache.lrl = uint32(rl)
if mb.cache.fseq == 0 {
mb.cache.fseq = seq
}

// Set cache timestamp for last store.
mb.lwts = ts
// Decide if we write index info if flushing in place.
writeIndex := ts-mb.lwits > wiThresh

// Accounting
mb.updateAccounting(seq, ts, rl)
// Only update index and do accounting if not a delete tombstone.
if seq&tbit == 0 {
// Strip ebit if set.
seq = seq &^ ebit
if mb.cache.fseq == 0 {
mb.cache.fseq = seq
}
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
// Accounting
mb.updateAccounting(seq, ts, rl)
}

fch, werr := mb.fch, mb.werr

Expand Down Expand Up @@ -3927,7 +3945,7 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) {
seq = seq &^ ebit
}

if mb.first.seq == 0 || mb.first.ts == 0 {
if mb.first.seq == 0 || mb.first.ts == 0 && seq >= mb.first.seq {
mb.first.seq = seq
mb.first.ts = ts
}
Expand Down Expand Up @@ -4112,6 +4130,12 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
return errCorruptState
}

// Check for tombstones which we can skip in terms of indexing.
if seq&tbit != 0 {
index += rl
continue
}

// Clear erase bit.
seq = seq &^ ebit

Expand Down Expand Up @@ -4467,15 +4491,17 @@ var (
errNoMainKey = errors.New("encrypted store encountered with no main key")
)

// Used for marking messages that have had their checksums checked.
// Used to signal a message record with headers.
const hbit = 1 << 31

// Used for marking erased messages sequences.
const ebit = 1 << 63

// Used to mark a bad index as deleted.
const dbit = 1 << 30
const (
// Used for marking messages that have had their checksums checked.
// Used to signal a message record with headers.
hbit = 1 << 31
// Used for marking erased messages sequences.
ebit = 1 << 63
// Used for marking tombstone sequences.
tbit = 1 << 62
// Used to mark a bad index as deleted.
dbit = 1 << 30
)

// Will do a lookup from cache.
// Lock should be held.
Expand Down
96 changes: 95 additions & 1 deletion server/filestore_test.go
Expand Up @@ -5501,6 +5501,7 @@ func TestFileStoreNumPendingLargeNumBlks(t *testing.T) {
}
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"zzz"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

subj, msg := "zzz", bytes.Repeat([]byte("X"), 100)
numMsgs := 10_000
Expand Down Expand Up @@ -5554,6 +5555,7 @@ func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) {
prf,
)
require_NoError(t, err)
defer fs.Stop()

subj, msg := "zzz", bytes.Repeat([]byte("X"), 100)
numMsgs := 100
Expand All @@ -5572,9 +5574,10 @@ func TestFileStoreRestoreEncryptedWithNoKeyFuncFails(t *testing.T) {
require_Error(t, err, errNoMainKey)
}

func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
func TestFileStoreRecalculateFirstForSubjBug(t *testing.T) {
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

fs.StoreMsg("foo", nil, nil) // 1
fs.StoreMsg("bar", nil, nil) // 2
Expand Down Expand Up @@ -5607,6 +5610,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) {
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("A"), 19)
for i := 0; i < 5; i++ {
Expand All @@ -5623,3 +5627,93 @@ func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) {
require_NoError(t, err)
require_True(t, n == 3)
}

// This is for 2.10 delete tombstones and backward compatibility if a user downgrades to 2.9.x
// TODO(dlc) - Can remove once merged into 2.10 codebase.
func TestFileStoreTombstoneBackwardCompatibility(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

// We will test scenarios where tombstones are embedded in a filestore from a 2.10 system.
msgA := bytes.Repeat([]byte("A"), 22)
msgZ := bytes.Repeat([]byte("Z"), 22)

fs.StoreMsg("A", nil, msgA)
fs.StoreMsg("B", nil, msgZ)

mb := fs.getFirstBlock()
require_True(t, mb != nil)

// >= 2.10 tombstone
mb.writeMsgRecord(emptyRecordLen, 2|tbit, _EMPTY_, nil, nil, time.Now().UnixNano(), true)

// Put a real message behind it.
fs.StoreMsg("C", nil, msgA)

checkState := func() {
state := fs.State()
require_True(t, state.Msgs == 3)
require_True(t, state.FirstSeq == 1)
require_True(t, state.LastSeq == 3)
require_True(t, state.NumSubjects == 3)

sm, err := fs.LoadMsg(2, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgZ))
require_True(t, sm.subj == "B")

sm, err = fs.LoadMsg(3, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgA))
require_True(t, sm.subj == "C")
}

checkState()
fs.Stop()

// Make sure we are good on recreate.
fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

checkState()

// Now we will purge, place tombstone first, then add messages and check.
_, err = fs.Purge()
require_NoError(t, err)

// >= 2.10 tombstone
mb.writeMsgRecord(emptyRecordLen, 22|tbit, _EMPTY_, nil, nil, time.Now().UnixNano(), true)

fs.StoreMsg("A", nil, msgA) // seq 4
fs.StoreMsg("B", nil, msgZ) // seq 5

checkPurgeState := func() {
state := fs.State()
require_True(t, state.Msgs == 2)
require_True(t, state.FirstSeq == 4)
require_True(t, state.LastSeq == 5)
require_True(t, state.NumSubjects == 2)

sm, err := fs.LoadMsg(4, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgA))
require_True(t, sm.subj == "A")

sm, err = fs.LoadMsg(5, nil)
require_NoError(t, err)
require_True(t, bytes.Equal(sm.msg, msgZ))
require_True(t, sm.subj == "B")
}

checkPurgeState()

// Make sure we are good on recreate.
fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

checkPurgeState()
}

0 comments on commit 774987c

Please sign in to comment.