Skip to content

Commit

Permalink
Various fixes and improvements to tombstone and buffer gaps. (#4553)
Browse files Browse the repository at this point in the history
We fixed a few bugs in tombstone handling, and formalized support for
holes in the underlying buffers. Due to customer data from the field we
also now use holes during compaction.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 18, 2023
2 parents 156e1a5 + acffa06 commit 216df81
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 40 deletions.
87 changes: 47 additions & 40 deletions server/filestore.go
Expand Up @@ -1559,6 +1559,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
}

if numBlocks := readU64(); numBlocks > 0 {
lastIndex := int(numBlocks - 1)
fs.blks = make([]*msgBlock, 0, numBlocks)
for i := 0; i < int(numBlocks); i++ {
index, nbytes, fseq, fts, lseq, lts, numDeleted := uint32(readU64()), readU64(), readU64(), readI64(), readU64(), readI64(), readU64()
Expand All @@ -1578,7 +1579,13 @@ func (fs *fileStore) recoverFullState() (rerr error) {
mb.msgs -= numDeleted
bi += n
}
fs.addMsgBlock(mb)
// Only add in if not empty or the lmb.
if mb.msgs > 0 || i == lastIndex {
fs.addMsgBlock(mb)
} else {
// Mark dirty to cleanup.
fs.dirty++
}
}
}

Expand Down Expand Up @@ -2905,10 +2912,6 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
return err
}

// Mark dirty here since we added in a new message.
// We do not kick the flusher, that happens on new msg block for write or Stop().
fs.dirty++

// Adjust top level tracking of per subject msg counts.
if len(subj) > 0 {
index := fs.lmb.index
Expand Down Expand Up @@ -3249,9 +3252,13 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
}
}

// Now write updated index for all affected msgBlks.
// Expire the cache if we can.
for mb := range blks {
mb.tryForceExpireCacheLocked()
mb.mu.Lock()
if mb.msgs > 0 {
mb.tryForceExpireCacheLocked()
}
mb.mu.Unlock()
}
}

Expand Down Expand Up @@ -3483,6 +3490,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// We will write a tombstone at the end.
var firstSeqNeedsUpdate bool
if isEmpty {
// This writes tombstone iff mb == lmb, so no need to do below.
fs.removeMsgBlock(mb)
firstSeqNeedsUpdate = seq == fs.state.FirstSeq
}
Expand All @@ -3498,11 +3506,12 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// Check if we need to write a deleted record tombstone.
// This is for user initiated removes or to hold the first seq
// when the last block is empty.
if !viaLimits || (isEmpty && isLastBlock) {

// If not via limits and not empty and last (empty writes tombstone above if last) write tombstone.
if !viaLimits && !(isEmpty && isLastBlock) {
if lmb := fs.lmb; sm != nil && lmb != nil {
lmb.writeTombstone(sm.seq, sm.ts)
}
fs.kickFlushStateLoop()
}

if cb := fs.scb; cb != nil {
Expand Down Expand Up @@ -3552,9 +3561,6 @@ func (mb *msgBlock) compact() {
return mb.dmap.Exists(seq)
}

// For skip msgs.
var smh [msgHdrSize]byte

for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize > lbuf {
return
Expand All @@ -3572,28 +3578,25 @@ func (mb *msgBlock) compact() {
seq := le.Uint64(hdr[4:])

if !isDeleted(seq) {
// Normal message here.
nbuf = append(nbuf, buf[index:index+rl]...)
// Do not set based on tombstone.
if !firstSet && seq&tbit == 0 {
firstSet = true
mb.first.seq = seq
}
} else if firstSet {
// This is an interior delete that we need to make sure we have a placeholder for.
le.PutUint32(smh[0:], emptyRecordLen)
le.PutUint64(smh[4:], seq|ebit)
le.PutUint64(smh[12:], 0)
le.PutUint16(smh[20:], 0)
nbuf = append(nbuf, smh[:]...)
mb.hh.Reset()
mb.hh.Write(smh[4:20])
checksum := mb.hh.Sum(nil)
nbuf = append(nbuf, checksum...)
}
// Always set last.
mb.last.seq = seq &^ ebit

// Check for tombstones.
if seq&tbit != 0 {
// If we are last mb we should consider to keep these unless the tombstone reflects a seq in this mb.
if mb == mb.fs.lmb && seq < mb.first.seq {
nbuf = append(nbuf, buf[index:index+rl]...)
}
} else {
// Normal message here.
nbuf = append(nbuf, buf[index:index+rl]...)
if !firstSet {
firstSet = true
mb.first.seq = seq
}
}
}
// Always set last as long as not a tombstone.
if seq&tbit == 0 {
mb.last.seq = seq &^ ebit
}
// Advance to next record.
index += rl
}
Expand Down Expand Up @@ -4273,7 +4276,6 @@ func (mb *msgBlock) enableForWriting(fip bool) error {
}

// Helper function to place a delete tombstone.
// Lock should be held.
func (mb *msgBlock) writeTombstone(seq uint64, ts int64) error {
return mb.writeMsgRecord(emptyRecordLen, seq|tbit, _EMPTY_, nil, nil, ts, true)
}
Expand All @@ -4299,6 +4301,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
// Check if we are tracking per subject for our simple state.
// Do this before changing the cache that would trigger a flush pending msgs call
// if we needed to regenerate the per subject info.
// Note that tombstones have no subject so will not trigger here.
if len(subj) > 0 && !mb.noTrack {
if err := mb.ensurePerSubjectInfoLoaded(); err != nil {
return err
Expand Down Expand Up @@ -4808,7 +4811,7 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock {
func (mb *msgBlock) indexCacheBuf(buf []byte) error {
var le = binary.LittleEndian

var fseq, pseq uint64
var fseq uint64
var idx []uint32
var index uint32

Expand Down Expand Up @@ -4865,14 +4868,13 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
// not introduce latency for first message from a newly loaded block.
if seq >= mb.first.seq {
// Track that we do not have holes.
// Not expected but did see it in the field.
if pseq > 0 && seq != pseq+1 {
for dseq := pseq + 1; dseq < seq; dseq++ {
if slot := int(seq - mb.first.seq); slot != len(idx) {
// If we have a hole fill it.
for dseq := mb.first.seq + uint64(len(idx)); dseq < seq; dseq++ {
idx = append(idx, dbit)
mb.dmap.Insert(dseq)
}
}
pseq = seq
// Add to our index.
idx = append(idx, index)
mb.cache.lrl = uint32(rl)
Expand All @@ -4881,6 +4883,11 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
fseq = seq
}

// Make sure our dmap has this entry if it was erased.
if erased {
mb.dmap.Insert(seq)
}

// Handle FSS inline here.
if slen > 0 && !mb.noTrack && !erased && !mb.dmap.Exists(seq) {
bsubj := buf[index+msgHdrSize : index+msgHdrSize+uint32(slen)]
Expand Down
115 changes: 115 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5981,6 +5981,121 @@ func TestFileStoreSelectBlockWithFirstSeqRemovals(t *testing.T) {
})
}

func TestFileStoreMsgBlockHolesAndIndexing(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1},
)
require_NoError(t, err)
defer fs.Stop()

// Grab the message block by hand and manipulate at that level.
mb := fs.getFirstBlock()
writeMsg := func(subj string, seq uint64) {
rl := fileStoreMsgSize(subj, nil, []byte(subj))
require_NoError(t, mb.writeMsgRecord(rl, seq, subj, nil, []byte(subj), time.Now().UnixNano(), true))
}
readMsg := func(seq uint64, expectedSubj string) {
// Clear cache so we load back in from disk and need to properly process anyholes.
ld, tombs, err := mb.rebuildState()
require_NoError(t, err)
require_Equal(t, ld, nil)
require_Equal(t, len(tombs), 0)
fs.rebuildState(nil)
sm, _, err := mb.fetchMsg(seq, nil)
require_NoError(t, err)
require_Equal(t, sm.subj, expectedSubj)
require_True(t, bytes.Equal(sm.buf, []byte(expectedSubj)))
}

writeMsg("A", 2)
require_Equal(t, mb.first.seq, 2)
require_Equal(t, mb.last.seq, 2)

writeMsg("B", 4)
require_Equal(t, mb.first.seq, 2)
require_Equal(t, mb.last.seq, 4)

writeMsg("C", 12)

readMsg(4, "B")
require_True(t, mb.dmap.Exists(3))

readMsg(12, "C")
readMsg(2, "A")

// Check that we get deleted for the right ones etc.
checkDeleted := func(seq uint64) {
_, _, err = mb.fetchMsg(seq, nil)
require_Error(t, err, ErrStoreMsgNotFound, errDeletedMsg)
mb.mu.RLock()
shouldExist, exists := seq >= mb.first.seq, mb.dmap.Exists(seq)
mb.mu.RUnlock()
if shouldExist {
require_True(t, exists)
}
}
checkDeleted(1)
checkDeleted(3)
for seq := 5; seq < 12; seq++ {
checkDeleted(uint64(seq))
}
}

func TestFileStoreMsgBlockCompactionAndHoles(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1},
)
require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
for _, subj := range []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"} {
fs.StoreMsg(subj, nil, msg)
}
// Leave first one but delete the rest.
for seq := uint64(2); seq < 10; seq++ {
fs.RemoveMsg(seq)
}
require_Equal(t, fs.numMsgBlocks(), 1)
mb := fs.getFirstBlock()
require_NotNil(t, mb)

_, ub, _ := fs.Utilization()

// Do compaction, should remove all excess now.
mb.mu.Lock()
mb.compact()
mb.mu.Unlock()

ta, ua, _ := fs.Utilization()
require_Equal(t, ub, ua)
require_Equal(t, ta, ua)
}

func TestFileStoreRemoveLastNoDoubleTombstones(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1},
)
require_NoError(t, err)
defer fs.Stop()

fs.StoreMsg("A", nil, []byte("hello"))
fs.mu.Lock()
fs.removeMsgViaLimits(1)
fs.mu.Unlock()

require_Equal(t, fs.numMsgBlocks(), 1)
mb := fs.getFirstBlock()
require_NotNil(t, mb)
mb.loadMsgs()
rbytes, _, err := fs.Utilization()
require_NoError(t, err)
require_Equal(t, rbytes, emptyRecordLen)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 216df81

Please sign in to comment.