Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various fixes and improvements to tombstone and buffer gaps. #4553

Merged
merged 1 commit into from Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 47 additions & 40 deletions server/filestore.go
Expand Up @@ -1559,6 +1559,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
}

if numBlocks := readU64(); numBlocks > 0 {
lastIndex := int(numBlocks - 1)
fs.blks = make([]*msgBlock, 0, numBlocks)
for i := 0; i < int(numBlocks); i++ {
index, nbytes, fseq, fts, lseq, lts, numDeleted := uint32(readU64()), readU64(), readU64(), readI64(), readU64(), readI64(), readU64()
Expand All @@ -1578,7 +1579,13 @@ func (fs *fileStore) recoverFullState() (rerr error) {
mb.msgs -= numDeleted
bi += n
}
fs.addMsgBlock(mb)
// Only add in if not empty or the lmb.
if mb.msgs > 0 || i == lastIndex {
fs.addMsgBlock(mb)
} else {
// Mark dirty to cleanup.
fs.dirty++
}
}
}

Expand Down Expand Up @@ -2905,10 +2912,6 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
return err
}

// Mark dirty here since we added in a new message.
// We do not kick the flusher, that happens on new msg block for write or Stop().
fs.dirty++

// Adjust top level tracking of per subject msg counts.
if len(subj) > 0 {
index := fs.lmb.index
Expand Down Expand Up @@ -3249,9 +3252,13 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
}
}

// Now write updated index for all affected msgBlks.
// Expire the cache if we can.
for mb := range blks {
mb.tryForceExpireCacheLocked()
mb.mu.Lock()
if mb.msgs > 0 {
mb.tryForceExpireCacheLocked()
}
mb.mu.Unlock()
}
}

Expand Down Expand Up @@ -3483,6 +3490,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// We will write a tombstone at the end.
var firstSeqNeedsUpdate bool
if isEmpty {
// This writes tombstone iff mb == lmb, so no need to do below.
fs.removeMsgBlock(mb)
firstSeqNeedsUpdate = seq == fs.state.FirstSeq
}
Expand All @@ -3498,11 +3506,12 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// Check if we need to write a deleted record tombstone.
// This is for user initiated removes or to hold the first seq
// when the last block is empty.
if !viaLimits || (isEmpty && isLastBlock) {

// If not via limits and not empty and last (empty writes tombstone above if last) write tombstone.
if !viaLimits && !(isEmpty && isLastBlock) {
if lmb := fs.lmb; sm != nil && lmb != nil {
lmb.writeTombstone(sm.seq, sm.ts)
}
fs.kickFlushStateLoop()
}

if cb := fs.scb; cb != nil {
Expand Down Expand Up @@ -3552,9 +3561,6 @@ func (mb *msgBlock) compact() {
return mb.dmap.Exists(seq)
}

// For skip msgs.
var smh [msgHdrSize]byte

for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize > lbuf {
return
Expand All @@ -3572,28 +3578,25 @@ func (mb *msgBlock) compact() {
seq := le.Uint64(hdr[4:])

if !isDeleted(seq) {
// Normal message here.
nbuf = append(nbuf, buf[index:index+rl]...)
// Do not set based on tombstone.
if !firstSet && seq&tbit == 0 {
firstSet = true
mb.first.seq = seq
}
} else if firstSet {
// This is an interior delete that we need to make sure we have a placeholder for.
le.PutUint32(smh[0:], emptyRecordLen)
le.PutUint64(smh[4:], seq|ebit)
le.PutUint64(smh[12:], 0)
le.PutUint16(smh[20:], 0)
nbuf = append(nbuf, smh[:]...)
mb.hh.Reset()
mb.hh.Write(smh[4:20])
checksum := mb.hh.Sum(nil)
nbuf = append(nbuf, checksum...)
}
// Always set last.
mb.last.seq = seq &^ ebit

// Check for tombstones.
if seq&tbit != 0 {
// If we are last mb we should consider to keep these unless the tombstone reflects a seq in this mb.
if mb == mb.fs.lmb && seq < mb.first.seq {
nbuf = append(nbuf, buf[index:index+rl]...)
}
} else {
// Normal message here.
nbuf = append(nbuf, buf[index:index+rl]...)
if !firstSet {
firstSet = true
mb.first.seq = seq
}
}
}
// Always set last as long as not a tombstone.
if seq&tbit == 0 {
mb.last.seq = seq &^ ebit
}
// Advance to next record.
index += rl
}
Expand Down Expand Up @@ -4273,7 +4276,6 @@ func (mb *msgBlock) enableForWriting(fip bool) error {
}

// Helper function to place a delete tombstone.
// Lock should be held.
func (mb *msgBlock) writeTombstone(seq uint64, ts int64) error {
return mb.writeMsgRecord(emptyRecordLen, seq|tbit, _EMPTY_, nil, nil, ts, true)
}
Expand All @@ -4299,6 +4301,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
// Check if we are tracking per subject for our simple state.
// Do this before changing the cache that would trigger a flush pending msgs call
// if we needed to regenerate the per subject info.
// Note that tombstones have no subject so will not trigger here.
if len(subj) > 0 && !mb.noTrack {
if err := mb.ensurePerSubjectInfoLoaded(); err != nil {
return err
Expand Down Expand Up @@ -4808,7 +4811,7 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock {
func (mb *msgBlock) indexCacheBuf(buf []byte) error {
var le = binary.LittleEndian

var fseq, pseq uint64
var fseq uint64
var idx []uint32
var index uint32

Expand Down Expand Up @@ -4865,14 +4868,13 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
// not introduce latency for first message from a newly loaded block.
if seq >= mb.first.seq {
// Track that we do not have holes.
// Not expected but did see it in the field.
if pseq > 0 && seq != pseq+1 {
for dseq := pseq + 1; dseq < seq; dseq++ {
if slot := int(seq - mb.first.seq); slot != len(idx) {
// If we have a hole fill it.
for dseq := mb.first.seq + uint64(len(idx)); dseq < seq; dseq++ {
idx = append(idx, dbit)
mb.dmap.Insert(dseq)
}
}
pseq = seq
// Add to our index.
idx = append(idx, index)
mb.cache.lrl = uint32(rl)
Expand All @@ -4881,6 +4883,11 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
fseq = seq
}

// Make sure our dmap has this entry if it was erased.
if erased {
mb.dmap.Insert(seq)
}

// Handle FSS inline here.
if slen > 0 && !mb.noTrack && !erased && !mb.dmap.Exists(seq) {
bsubj := buf[index+msgHdrSize : index+msgHdrSize+uint32(slen)]
Expand Down
115 changes: 115 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5981,6 +5981,121 @@ func TestFileStoreSelectBlockWithFirstSeqRemovals(t *testing.T) {
})
}

func TestFileStoreMsgBlockHolesAndIndexing(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1},
)
require_NoError(t, err)
defer fs.Stop()

// Grab the message block by hand and manipulate at that level.
mb := fs.getFirstBlock()
writeMsg := func(subj string, seq uint64) {
rl := fileStoreMsgSize(subj, nil, []byte(subj))
require_NoError(t, mb.writeMsgRecord(rl, seq, subj, nil, []byte(subj), time.Now().UnixNano(), true))
}
readMsg := func(seq uint64, expectedSubj string) {
// Clear cache so we load back in from disk and need to properly process anyholes.
ld, tombs, err := mb.rebuildState()
require_NoError(t, err)
require_Equal(t, ld, nil)
require_Equal(t, len(tombs), 0)
fs.rebuildState(nil)
sm, _, err := mb.fetchMsg(seq, nil)
require_NoError(t, err)
require_Equal(t, sm.subj, expectedSubj)
require_True(t, bytes.Equal(sm.buf, []byte(expectedSubj)))
}

writeMsg("A", 2)
require_Equal(t, mb.first.seq, 2)
require_Equal(t, mb.last.seq, 2)

writeMsg("B", 4)
require_Equal(t, mb.first.seq, 2)
require_Equal(t, mb.last.seq, 4)

writeMsg("C", 12)

readMsg(4, "B")
require_True(t, mb.dmap.Exists(3))

readMsg(12, "C")
readMsg(2, "A")

// Check that we get deleted for the right ones etc.
checkDeleted := func(seq uint64) {
_, _, err = mb.fetchMsg(seq, nil)
require_Error(t, err, ErrStoreMsgNotFound, errDeletedMsg)
mb.mu.RLock()
shouldExist, exists := seq >= mb.first.seq, mb.dmap.Exists(seq)
mb.mu.RUnlock()
if shouldExist {
require_True(t, exists)
}
}
checkDeleted(1)
checkDeleted(3)
for seq := 5; seq < 12; seq++ {
checkDeleted(uint64(seq))
}
}

func TestFileStoreMsgBlockCompactionAndHoles(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1},
)
require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
for _, subj := range []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"} {
fs.StoreMsg(subj, nil, msg)
}
// Leave first one but delete the rest.
for seq := uint64(2); seq < 10; seq++ {
fs.RemoveMsg(seq)
}
require_Equal(t, fs.numMsgBlocks(), 1)
mb := fs.getFirstBlock()
require_NotNil(t, mb)

_, ub, _ := fs.Utilization()

// Do compaction, should remove all excess now.
mb.mu.Lock()
mb.compact()
mb.mu.Unlock()

ta, ua, _ := fs.Utilization()
require_Equal(t, ub, ua)
require_Equal(t, ta, ua)
}

func TestFileStoreRemoveLastNoDoubleTombstones(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1},
)
require_NoError(t, err)
defer fs.Stop()

fs.StoreMsg("A", nil, []byte("hello"))
fs.mu.Lock()
fs.removeMsgViaLimits(1)
fs.mu.Unlock()

require_Equal(t, fs.numMsgBlocks(), 1)
mb := fs.getFirstBlock()
require_NotNil(t, mb)
mb.loadMsgs()
rbytes, _, err := fs.Utilization()
require_NoError(t, err)
require_Equal(t, rbytes, emptyRecordLen)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down