Skip to content

Commit

Permalink
[FIXED] Compaction with compression and added out of band compaction (#…
Browse files Browse the repository at this point in the history
…4645)

This will also reclaim more space for streams with lots of interior
deletes.


Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Oct 11, 2023
2 parents 9a55118 + 842d600 commit 94545f3
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 21 deletions.
84 changes: 63 additions & 21 deletions server/filestore.go
Expand Up @@ -3491,14 +3491,13 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
} else if !isEmpty {
// Out of order delete.
mb.dmap.Insert(seq)
// Check if <25% utilization and minimum size met.
if mb.rbytes > compactMinimum && !isLastBlock {
// Remove the interior delete records
rbytes := mb.rbytes - uint64(mb.dmap.Size()*emptyRecordLen)
if rbytes>>2 > mb.bytes {
mb.compact()
fs.kickFlushStateLoop()
}
// Make simple check here similar to Compact(). If we can save 50% and over a certain threshold do inline.
// All other more thorough cleanup will happen in syncBlocks logic.
// Note that we do not have to store empty records for the deleted, so don't use to calculate.
// TODO(dlc) - This should not be inline, should kick the sync routine.
if mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && !isLastBlock {
mb.compact()
fs.kickFlushStateLoop()
}
}

Expand Down Expand Up @@ -3572,7 +3571,9 @@ func (mb *msgBlock) compact() {
}

buf := mb.cache.buf
nbuf := make([]byte, 0, len(buf))
nbuf := getMsgBlockBuf(len(buf))
// Recycle our nbuf when we are done.
defer recycleMsgBlockBuf(nbuf)

var le = binary.LittleEndian
var firstSet bool
Expand Down Expand Up @@ -3622,9 +3623,16 @@ func (mb *msgBlock) compact() {
}

// Handle compression
var err error
if nbuf, err = mb.cmp.Compress(nbuf); err != nil {
return
if mb.cmp != NoCompression {
cbuf, err := mb.cmp.Compress(nbuf)
if err != nil {
return
}
meta := &CompressionInfo{
Algorithm: mb.cmp,
OriginalSize: uint64(len(nbuf)),
}
nbuf = append(meta.MarshalMetadata(), cbuf...)
}

// Check for encryption.
Expand Down Expand Up @@ -4701,6 +4709,24 @@ func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) {
}
}

// Lock should be held.
func (mb *msgBlock) ensureRawBytesLoaded() error {
if mb.rbytes > 0 {
return nil
}
f, err := os.Open(mb.mfn)
if err != nil {
return err
}
defer f.Close()
if fi, err := f.Stat(); fi != nil && err == nil {
mb.rbytes = uint64(fi.Size())
} else {
return err
}
return nil
}

// Sync msg and index files as needed. This is called from a timer.
func (fs *fileStore) syncBlocks() {
fs.mu.RLock()
Expand All @@ -4709,8 +4735,10 @@ func (fs *fileStore) syncBlocks() {
return
}
blks := append([]*msgBlock(nil), fs.blks...)
lmb := fs.lmb
fs.mu.RUnlock()

var shouldWriteState bool
for _, mb := range blks {
// Do actual sync. Hold lock for consistency.
mb.mu.Lock()
Expand All @@ -4722,24 +4750,33 @@ func (fs *fileStore) syncBlocks() {
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
// Check if we should compact here as well.
// Do not compact last mb.
if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.rbytes > mb.bytes {
mb.compact()
shouldWriteState = true
}

// Check if we need to sync. We will not hold lock during actual sync.
var fn string
if mb.needSync {
needSync, fn := mb.needSync, mb.mfn
if needSync {
// Flush anything that may be pending.
if mb.pendingWriteSizeLocked() > 0 {
mb.flushPendingMsgsLocked()
}
fn = mb.mfn
mb.needSync = false
mb.flushPendingMsgsLocked()
}
mb.mu.Unlock()

// Check if we need to sync.
// This is done not holding any locks.
if fn != _EMPTY_ {
if needSync {
if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil {
fd.Sync()
canClear := fd.Sync() == nil
fd.Close()
// Only clear sync flag on success.
if canClear {
mb.mu.Lock()
mb.needSync = false
mb.mu.Unlock()
}
}
}
}
Expand All @@ -4750,6 +4787,11 @@ func (fs *fileStore) syncBlocks() {
syncAlways := fs.fcfg.SyncAlways
fs.mu.Unlock()

// Check if we should write out our state due to compaction of one or more msg blocks.
if shouldWriteState {
fs.writeFullState()
}
// Sync state file if we are not running with sync always.
if !syncAlways {
if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil {
fd.Sync()
Expand Down
69 changes: 69 additions & 0 deletions server/filestore_test.go
Expand Up @@ -6235,6 +6235,75 @@ func TestFileStoreFullStateMidBlockPastWAL(t *testing.T) {
})
}

func TestFileStoreCompactingBlocksOnSync(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 1000 // 20 msgs per block.
fcfg.SyncInterval = 100 * time.Millisecond
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage, MaxMsgsPer: 1}

prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}
if fcfg.Cipher == NoCipher {
prf = nil
}

fs, err := newFileStoreWithCreated(fcfg, scfg, time.Now(), prf, nil)
require_NoError(t, err)
defer fs.Stop()

// This yields an internal record length of 50 bytes. So 20 msgs per blk.
msg := bytes.Repeat([]byte("Z"), 19)
subjects := "ABCDEFGHIJKLMNOPQRST"
for _, subj := range subjects {
fs.StoreMsg(string(subj), nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 1)
total, reported, err := fs.Utilization()
require_NoError(t, err)

require_Equal(t, total, reported)

// Now start removing, since we are small this should not kick in any inline logic.
// Remove all interior messages, leave 1 and 20. So write B-S
for i := 1; i < 19; i++ {
fs.StoreMsg(string(subjects[i]), nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 2)

blkUtil := func() (uint64, uint64) {
fs.mu.RLock()
fmb := fs.blks[0]
fs.mu.RUnlock()
fmb.mu.RLock()
defer fmb.mu.RUnlock()
return fmb.rbytes, fmb.bytes
}

total, reported = blkUtil()
require_Equal(t, reported, 100)
// Raw bytes will be 1000, but due to compression could be less.
if fcfg.Compression != NoCompression {
require_True(t, total > reported)
} else {
require_Equal(t, total, 1000)
}

// Make sure the sync interval when kicked in compacts down to rbytes == 100.
checkFor(t, time.Second, 100*time.Millisecond, func() error {
if total, reported := blkUtil(); total <= reported {
return nil
}
return fmt.Errorf("Not compacted yet, raw %v vs reported %v",
friendlyBytes(total), friendlyBytes(reported))
})
})
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 94545f3

Please sign in to comment.