Skip to content

Commit

Permalink
[FIXED] Stream / KV lookups fail after decreasing history size. (#4656)
Browse files Browse the repository at this point in the history
Fixed a bug that was not correctly selecting next first because it was
not properly skipping new dbit entries.
This could result in lookups failing, e.g. after a change in max msgs
per subject to a lower value.

Also fixed a bug that would not properly update our psim during compact
when throwing away the whole block and a subject had more than one
message.

Signed-off-by: Derek Collison <derek@nats.io>

Resolves: #4643
  • Loading branch information
derekcollison committed Oct 12, 2023
2 parents 1e8f6bf + b7b40b0 commit 444a47e
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 21 deletions.
48 changes: 27 additions & 21 deletions server/filestore.go
Expand Up @@ -486,7 +486,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim

// If we have max msgs per subject make sure the is also enforced.
if fs.cfg.MaxMsgsPer > 0 {
fs.enforceMsgPerSubjectLimit()
fs.enforceMsgPerSubjectLimit(false)
}

// Grab first sequence for check below while we have lock.
Expand Down Expand Up @@ -589,7 +589,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
}

if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer {
fs.enforceMsgPerSubjectLimit()
fs.enforceMsgPerSubjectLimit(true)
}
fs.mu.Unlock()

Expand Down Expand Up @@ -1907,13 +1907,10 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
for subj := range mb.fss {
fs.removePerSubject(subj)
}
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
for subj := range mb.fss {
fs.removePerSubject(subj)
for subj, ss := range mb.fss {
for i := uint64(0); i < ss.Msgs; i++ {
fs.removePerSubject(subj)
}
}
mb.dirtyCloseWithRemove(true)
deleted++
Expand Down Expand Up @@ -3190,14 +3187,16 @@ func (fs *fileStore) enforceBytesLimit() {
// We will make sure to go through all msg blocks etc. but in practice this
// will most likely only be the last one, so can take a more conservative approach.
// Lock should be held.
func (fs *fileStore) enforceMsgPerSubjectLimit() {
func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
maxMsgsPer := uint64(fs.cfg.MaxMsgsPer)

// We want to suppress callbacks from remove during this process
// We may want to suppress callbacks from remove during this process
// since these should have already been deleted and accounted for.
cb := fs.scb
fs.scb = nil
defer func() { fs.scb = cb }()
if !fireCallback {
cb := fs.scb
fs.scb = nil
defer func() { fs.scb = cb }()
}

var numMsgs uint64

Expand Down Expand Up @@ -3251,6 +3250,9 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
}
// Grab the ss entry for this subject in case sparse.
mb.mu.Lock()
if mb.cacheNotLoaded() {
mb.loadMsgsWithLock()
}
mb.ensurePerSubjectInfoLoaded()
ss := mb.fss[subj]
if ss != nil && ss.firstNeedsUpdate {
Expand Down Expand Up @@ -6227,8 +6229,10 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
bytes += mb.bytes
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
for subj := range mb.fss {
fs.removePerSubject(subj)
for subj, ss := range mb.fss {
for i := uint64(0); i < ss.Msgs; i++ {
fs.removePerSubject(subj)
}
}
// Now close.
mb.dirtyCloseWithRemove(true)
Expand Down Expand Up @@ -6671,7 +6675,12 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si

var le = binary.LittleEndian
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
li := int(mb.cache.idx[slot]&^hbit) - mb.cache.off
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
Expand All @@ -6681,10 +6690,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
slen := int(le.Uint16(hdr[20:]))
if subj == string(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < mb.first.seq || seq&ebit != 0 {
continue
}
if mb.dmap.Exists(seq) {
if seq < mb.first.seq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
ss.First = seq
Expand Down
30 changes: 30 additions & 0 deletions server/filestore_test.go
Expand Up @@ -6304,6 +6304,36 @@ func TestFileStoreCompactingBlocksOnSync(t *testing.T) {
})
}

// Make sure a call to Compact() updates PSIM correctly.
func TestFileStoreCompactAndPSIMWhenDeletingBlocks(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 512},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

subj, msg := "A", bytes.Repeat([]byte("ABC"), 33) // ~100bytes

// Add in 10 As
for i := 0; i < 10; i++ {
fs.StoreMsg(subj, nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 4)

// Should leave 1.
n, err := fs.Compact(10)
require_NoError(t, err)
require_Equal(t, n, 9)
require_Equal(t, fs.numMsgBlocks(), 1)

fs.mu.RLock()
psi := fs.psim[subj]
fs.mu.RUnlock()

require_Equal(t, psi.total, 1)
require_Equal(t, psi.fblk, psi.lblk)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
58 changes: 58 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21931,3 +21931,61 @@ func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
},
})
}

// Make sure when we downgrade history to a smaller number that the account info
// is properly updated and all keys are still accessible.
// There was a bug calculating next first that was not taking into account the dbit slots.
func TestJetStreamKVReductionInHistory(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

startHistory := 4
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: uint8(startHistory)})
require_NoError(t, err)

numKeys, msg := 1000, bytes.Repeat([]byte("ABC"), 330) // ~1000bytes
for {
key := fmt.Sprintf("%X", rand.Intn(numKeys)+1)
_, err = kv.Put(key, msg)
require_NoError(t, err)
status, err := kv.Status()
require_NoError(t, err)
if status.Values() >= uint64(startHistory*numKeys) {
break
}
}
info, err := js.AccountInfo()
require_NoError(t, err)

checkAllKeys := func() {
// Make sure we can retrieve all of the keys.
keys, err := kv.Keys()
require_NoError(t, err)
require_Equal(t, len(keys), numKeys)
for _, key := range keys {
_, err := kv.Get(key)
require_NoError(t, err)
}
}

// Quick sanity check.
checkAllKeys()

si, err := js.StreamInfo("KV_TEST")
require_NoError(t, err)
// Adjust down to history of 1.
cfg := si.Config
cfg.MaxMsgsPerSubject = 1
_, err = js.UpdateStream(&cfg)
require_NoError(t, err)
// Make sure the accounting was updated.
ninfo, err := js.AccountInfo()
require_NoError(t, err)
require_True(t, info.Store > ninfo.Store)

// Make sure all keys still accessible.
checkAllKeys()
}

0 comments on commit 444a47e

Please sign in to comment.