Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Stream / KV lookups fail after decreasing history size. #4656

Merged
merged 1 commit into from Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 27 additions & 21 deletions server/filestore.go
Expand Up @@ -486,7 +486,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim

// If we have max msgs per subject make sure the is also enforced.
if fs.cfg.MaxMsgsPer > 0 {
fs.enforceMsgPerSubjectLimit()
fs.enforceMsgPerSubjectLimit(false)
}

// Grab first sequence for check below while we have lock.
Expand Down Expand Up @@ -589,7 +589,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
}

if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer {
fs.enforceMsgPerSubjectLimit()
fs.enforceMsgPerSubjectLimit(true)
}
fs.mu.Unlock()

Expand Down Expand Up @@ -1907,13 +1907,10 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
for subj := range mb.fss {
fs.removePerSubject(subj)
}
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
for subj := range mb.fss {
fs.removePerSubject(subj)
for subj, ss := range mb.fss {
for i := uint64(0); i < ss.Msgs; i++ {
fs.removePerSubject(subj)
}
}
mb.dirtyCloseWithRemove(true)
deleted++
Expand Down Expand Up @@ -3190,14 +3187,16 @@ func (fs *fileStore) enforceBytesLimit() {
// We will make sure to go through all msg blocks etc. but in practice this
// will most likely only be the last one, so can take a more conservative approach.
// Lock should be held.
func (fs *fileStore) enforceMsgPerSubjectLimit() {
func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
maxMsgsPer := uint64(fs.cfg.MaxMsgsPer)

// We want to suppress callbacks from remove during this process
// We may want to suppress callbacks from remove during this process
// since these should have already been deleted and accounted for.
cb := fs.scb
fs.scb = nil
defer func() { fs.scb = cb }()
if !fireCallback {
cb := fs.scb
fs.scb = nil
defer func() { fs.scb = cb }()
}

var numMsgs uint64

Expand Down Expand Up @@ -3251,6 +3250,9 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
}
// Grab the ss entry for this subject in case sparse.
mb.mu.Lock()
if mb.cacheNotLoaded() {
mb.loadMsgsWithLock()
}
mb.ensurePerSubjectInfoLoaded()
ss := mb.fss[subj]
if ss != nil && ss.firstNeedsUpdate {
Expand Down Expand Up @@ -6227,8 +6229,10 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
bytes += mb.bytes
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
for subj := range mb.fss {
fs.removePerSubject(subj)
for subj, ss := range mb.fss {
for i := uint64(0); i < ss.Msgs; i++ {
fs.removePerSubject(subj)
}
}
// Now close.
mb.dirtyCloseWithRemove(true)
Expand Down Expand Up @@ -6671,7 +6675,12 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si

var le = binary.LittleEndian
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
li := int(mb.cache.idx[slot]&^hbit) - mb.cache.off
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
Expand All @@ -6681,10 +6690,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
slen := int(le.Uint16(hdr[20:]))
if subj == string(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < mb.first.seq || seq&ebit != 0 {
continue
}
if mb.dmap.Exists(seq) {
if seq < mb.first.seq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
ss.First = seq
Expand Down
30 changes: 30 additions & 0 deletions server/filestore_test.go
Expand Up @@ -6304,6 +6304,36 @@ func TestFileStoreCompactingBlocksOnSync(t *testing.T) {
})
}

// Make sure a call to Compact() updates PSIM correctly.
func TestFileStoreCompactAndPSIMWhenDeletingBlocks(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 512},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

subj, msg := "A", bytes.Repeat([]byte("ABC"), 33) // ~100bytes

// Add in 10 As
for i := 0; i < 10; i++ {
fs.StoreMsg(subj, nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 4)

// Should leave 1.
n, err := fs.Compact(10)
require_NoError(t, err)
require_Equal(t, n, 9)
require_Equal(t, fs.numMsgBlocks(), 1)

fs.mu.RLock()
psi := fs.psim[subj]
fs.mu.RUnlock()

require_Equal(t, psi.total, 1)
require_Equal(t, psi.fblk, psi.lblk)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
58 changes: 58 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -21931,3 +21931,61 @@ func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
},
})
}

// Make sure when we downgrade history to a smaller number that the account info
// is properly updated and all keys are still accessible.
// There was a bug calculating next first that was not taking into account the dbit slots.
func TestJetStreamKVReductionInHistory(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

startHistory := 4
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: uint8(startHistory)})
require_NoError(t, err)

numKeys, msg := 1000, bytes.Repeat([]byte("ABC"), 330) // ~1000bytes
for {
key := fmt.Sprintf("%X", rand.Intn(numKeys)+1)
_, err = kv.Put(key, msg)
require_NoError(t, err)
status, err := kv.Status()
require_NoError(t, err)
if status.Values() >= uint64(startHistory*numKeys) {
break
}
}
info, err := js.AccountInfo()
require_NoError(t, err)

checkAllKeys := func() {
// Make sure we can retrieve all of the keys.
keys, err := kv.Keys()
require_NoError(t, err)
require_Equal(t, len(keys), numKeys)
for _, key := range keys {
_, err := kv.Get(key)
require_NoError(t, err)
}
}

// Quick sanity check.
checkAllKeys()

si, err := js.StreamInfo("KV_TEST")
require_NoError(t, err)
// Adjust down to history of 1.
cfg := si.Config
cfg.MaxMsgsPerSubject = 1
_, err = js.UpdateStream(&cfg)
require_NoError(t, err)
// Make sure the accounting was updated.
ninfo, err := js.AccountInfo()
require_NoError(t, err)
require_True(t, info.Store > ninfo.Store)

// Make sure all keys still accessible.
checkAllKeys()
}