Skip to content

Commit

Permalink
Merge pull request #3860 from nats-io/compact-subjects-fix
Browse files Browse the repository at this point in the history
Make sure we adjust per subject info when doing a Compact().
  • Loading branch information
derekcollison committed Feb 10, 2023
2 parents 72bce6c + 0da2a15 commit 7afddb3
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 2 deletions.
7 changes: 7 additions & 0 deletions server/filestore.go
Expand Up @@ -3125,6 +3125,7 @@ func (fs *fileStore) expireMsgs() {
fs.mu.RLock()
minAge := time.Now().UnixNano() - int64(fs.cfg.MaxAge)
fs.mu.RUnlock()

for sm, _ = fs.msgForSeq(0, &smv); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0, &smv) {
fs.removeMsg(sm.seq, false, true)
}
Expand Down Expand Up @@ -4859,6 +4860,12 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
mb.mu.Lock()
purged += mb.msgs
bytes += mb.bytes
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
for subj := range mb.fss {
fs.removePerSubject(subj)
}
// Now close.
mb.dirtyCloseWithRemove(true)
mb.mu.Unlock()
deleted++
Expand Down
32 changes: 31 additions & 1 deletion server/filestore_test.go
@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -5197,3 +5197,33 @@ func TestFileStoreStreamTruncateResetMultiBlock(t *testing.T) {
require_True(t, state.NumDeleted == 0)
})
}

func TestFileStoreStreamCompactMultiBlockSubjectInfo(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 128

fs, err := newFileStore(
fcfg,
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()

for i := 0; i < 1000; i++ {
subj := fmt.Sprintf("foo.%d", i)
_, _, err := fs.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
}
require_True(t, fs.numMsgBlocks() == 500)

// Compact such that we know we throw blocks away from the beginning.
deleted, err := fs.Compact(501)
require_NoError(t, err)
require_True(t, deleted == 500)
require_True(t, fs.numMsgBlocks() == 250)

// Make sure we adjusted for subjects etc.
state := fs.State()
require_True(t, state.NumSubjects == 500)
})
}
1 change: 1 addition & 0 deletions server/memstore.go
Expand Up @@ -541,6 +541,7 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
delete(ms.msgs, seq)
ms.removeSeqPerSubject(sm.subj, seq)
}
}
ms.state.Msgs -= purged
Expand Down
27 changes: 26 additions & 1 deletion server/memstore_test.go
@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -495,3 +495,28 @@ func TestMemStoreStreamTruncateReset(t *testing.T) {
require_True(t, state.NumSubjects == 1)
require_True(t, state.NumDeleted == 0)
}

func TestMemStoreStreamCompactMultiBlockSubjectInfo(t *testing.T) {
cfg := &StreamConfig{
Name: "TEST",
Storage: MemoryStorage,
Subjects: []string{"foo.*"},
}
ms, err := newMemStore(cfg)
require_NoError(t, err)

for i := 0; i < 1000; i++ {
subj := fmt.Sprintf("foo.%d", i)
_, _, err := ms.StoreMsg(subj, nil, []byte("Hello World"))
require_NoError(t, err)
}

// Compact such that we know we throw blocks away from the beginning.
deleted, err := ms.Compact(501)
require_NoError(t, err)
require_True(t, deleted == 500)

// Make sure we adjusted for subjects etc.
state := ms.State()
require_True(t, state.NumSubjects == 500)
}

0 comments on commit 7afddb3

Please sign in to comment.