Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Compaction with compression and added out of band compaction #4645

Merged
merged 2 commits into from Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 63 additions & 21 deletions server/filestore.go
Expand Up @@ -3491,14 +3491,13 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
} else if !isEmpty {
// Out of order delete.
mb.dmap.Insert(seq)
// Check if <25% utilization and minimum size met.
if mb.rbytes > compactMinimum && !isLastBlock {
// Remove the interior delete records
rbytes := mb.rbytes - uint64(mb.dmap.Size()*emptyRecordLen)
if rbytes>>2 > mb.bytes {
mb.compact()
fs.kickFlushStateLoop()
}
// Make simple check here similar to Compact(). If we can save 50% and over a certain threshold do inline.
// All other more thorough cleanup will happen in syncBlocks logic.
// Note that we do not have to store empty records for the deleted, so don't use to calculate.
// TODO(dlc) - This should not be inline, should kick the sync routine.
if mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && !isLastBlock {
mb.compact()
fs.kickFlushStateLoop()
}
}

Expand Down Expand Up @@ -3572,7 +3571,9 @@ func (mb *msgBlock) compact() {
}

buf := mb.cache.buf
nbuf := make([]byte, 0, len(buf))
nbuf := getMsgBlockBuf(len(buf))
// Recycle our nbuf when we are done.
defer recycleMsgBlockBuf(nbuf)

var le = binary.LittleEndian
var firstSet bool
Expand Down Expand Up @@ -3622,9 +3623,16 @@ func (mb *msgBlock) compact() {
}

// Handle compression
var err error
if nbuf, err = mb.cmp.Compress(nbuf); err != nil {
return
if mb.cmp != NoCompression {
cbuf, err := mb.cmp.Compress(nbuf)
if err != nil {
return
}
meta := &CompressionInfo{
Algorithm: mb.cmp,
OriginalSize: uint64(len(nbuf)),
}
nbuf = append(meta.MarshalMetadata(), cbuf...)
}

// Check for encryption.
Expand Down Expand Up @@ -4701,6 +4709,24 @@ func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) {
}
}

// Lock should be held.
func (mb *msgBlock) ensureRawBytesLoaded() error {
if mb.rbytes > 0 {
return nil
}
f, err := os.Open(mb.mfn)
if err != nil {
return err
}
defer f.Close()
if fi, err := f.Stat(); fi != nil && err == nil {
mb.rbytes = uint64(fi.Size())
} else {
return err
}
return nil
}

// Sync msg and index files as needed. This is called from a timer.
func (fs *fileStore) syncBlocks() {
fs.mu.RLock()
Expand All @@ -4709,8 +4735,10 @@ func (fs *fileStore) syncBlocks() {
return
}
blks := append([]*msgBlock(nil), fs.blks...)
lmb := fs.lmb
fs.mu.RUnlock()

var shouldWriteState bool
for _, mb := range blks {
// Do actual sync. Hold lock for consistency.
mb.mu.Lock()
Expand All @@ -4722,24 +4750,33 @@ func (fs *fileStore) syncBlocks() {
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
// Check if we should compact here as well.
// Do not compact last mb.
if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.rbytes > mb.bytes {
mb.compact()
shouldWriteState = true
}

// Check if we need to sync. We will not hold lock during actual sync.
var fn string
if mb.needSync {
needSync, fn := mb.needSync, mb.mfn
if needSync {
// Flush anything that may be pending.
if mb.pendingWriteSizeLocked() > 0 {
mb.flushPendingMsgsLocked()
}
fn = mb.mfn
mb.needSync = false
mb.flushPendingMsgsLocked()
}
mb.mu.Unlock()

// Check if we need to sync.
// This is done not holding any locks.
if fn != _EMPTY_ {
if needSync {
if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil {
fd.Sync()
canClear := fd.Sync() == nil
fd.Close()
// Only clear sync flag on success.
if canClear {
mb.mu.Lock()
mb.needSync = false
mb.mu.Unlock()
}
}
}
}
Expand All @@ -4750,6 +4787,11 @@ func (fs *fileStore) syncBlocks() {
syncAlways := fs.fcfg.SyncAlways
fs.mu.Unlock()

// Check if we should write out our state due to compaction of one or more msg blocks.
if shouldWriteState {
fs.writeFullState()
}
// Sync state file if we are not running with sync always.
if !syncAlways {
if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil {
fd.Sync()
Expand Down
69 changes: 69 additions & 0 deletions server/filestore_test.go
Expand Up @@ -6235,6 +6235,75 @@ func TestFileStoreFullStateMidBlockPastWAL(t *testing.T) {
})
}

func TestFileStoreCompactingBlocksOnSync(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 1000 // 20 msgs per block.
fcfg.SyncInterval = 100 * time.Millisecond
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1}

prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
if fcfg.Cipher == NoCipher {
prf = nil
}

fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// This yields an internal record length of 50 bytes. So 20 msgs per blk.
msg := bytes.Repeat([]byte("Z"), 19)
subjects := "ABCDEFGHIJKLMNOPQRST"
for _, subj := range subjects {
fs.StoreMsg(string(subj), nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 1)
total, reported, err := fs.Utilization()
require_NoError(t, err)

require_Equal(t, total, reported)

// Now start removing, since we are small this should not kick in any inline logic.
// Remove all interior messages, leave 1 and 20. So write B-S
for i := 1; i < 19; i++ {
fs.StoreMsg(string(subjects[i]), nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 2)

blkUtil := func() (uint64, uint64) {
fs.mu.RLock()
fmb := fs.blks[0]
fs.mu.RUnlock()
fmb.mu.RLock()
defer fmb.mu.RUnlock()
return fmb.rbytes, fmb.bytes
}

total, reported = blkUtil()
require_Equal(t, reported, 100)
// Raw bytes will be 1000, but due to compression could be less.
if fcfg.Compression != NoCompression {
require_True(t, total > reported)
} else {
require_Equal(t, total, 1000)
}

// Make sure the sync interval when kicked in compacts down to rbytes == 100.
checkFor(t, time.Second, 100*time.Millisecond, func() error {
if total, reported := blkUtil(); total <= reported {
return nil
}
return fmt.Errorf("Not compacted yet, raw %v vs reported %v",
friendlyBytes(total), friendlyBytes(reported))
})
})
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down