Skip to content

Commit

Permalink
[FIXED] Max msgs per subject config update to lower values (#4446)
Browse files Browse the repository at this point in the history
We were not recalculating first correctly since we were not considering
seq < mb.first.seq.

Signed-off-by: Derek Collison <derek@nats.io>

Resolves #4445
  • Loading branch information
derekcollison committed Aug 29, 2023
2 parents f9a2efd + 8865c2a commit abf5e0b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 15 deletions.
27 changes: 12 additions & 15 deletions server/filestore.go
Expand Up @@ -451,7 +451,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
fs.ageChk = nil
}

if cfg.MaxMsgsPer > 0 && cfg.MaxMsgsPer < old_cfg.MaxMsgsPer {
if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer {
fs.enforceMsgPerSubjectLimit()
}
fs.mu.Unlock()
Expand Down Expand Up @@ -4806,20 +4806,17 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
start = fs.state.FirstSeq
}

// TODO(dlc) - If num blocks gets large maybe use selectMsgBlock but have it return index b/c
// we need to keep walking if no match found in first mb.
for _, mb := range fs.blks {
// Skip blocks that are less than our starting sequence.
if start > atomic.LoadUint64(&mb.last.seq) {
continue
}
if sm, expireOk, err := mb.firstMatching(filter, wc, start, sm); err == nil {
if expireOk && mb != fs.lmb {
mb.tryForceExpireCache()
if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 {
for i := bi; i < len(fs.blks); i++ {
mb := fs.blks[i]
if sm, expireOk, err := mb.firstMatching(filter, wc, start, sm); err == nil {
if expireOk && mb != fs.lmb {
mb.tryForceExpireCache()
}
return sm, sm.seq, nil
} else if err != ErrStoreMsgNotFound {
return nil, 0, err
}
return sm, sm.seq, nil
} else if err != ErrStoreMsgNotFound {
return nil, 0, err
}
}

Expand Down Expand Up @@ -5948,7 +5945,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
slen := int(le.Uint16(hdr[20:]))
if subj == string(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq&ebit != 0 {
if seq < mb.first.seq || seq&ebit != 0 {
continue
}
if len(mb.dmap) > 0 {
Expand Down
61 changes: 61 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -20435,3 +20435,64 @@ func TestJetStreamUsageSyncDeadlock(t *testing.T) {

sendStreamMsg(t, nc, "foo", "hello")
}

// https://github.com/nats-io/nats.go/issues/1382
// https://github.com/nats-io/nats-server/issues/4445
func TestJetStreamChangeMaxMessagesPerSubject(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"one.>"},
MaxMsgsPerSubject: 5,
})
require_NoError(t, err)

for i := 0; i < 10; i++ {
sendStreamMsg(t, nc, "one.data", "data")
}

expectMsgs := func(num int32) error {
t.Helper()

var msgs atomic.Int32
sub, err := js.Subscribe("one.>", func(msg *nats.Msg) {
msgs.Add(1)
msg.Ack()
})
require_NoError(t, err)
defer sub.Unsubscribe()

checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
if nm := msgs.Load(); nm != num {
return fmt.Errorf("expected to get %v messages, got %v instead", num, nm)
}
return nil
})
return nil
}

require_NoError(t, expectMsgs(5))

js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"one.>"},
MaxMsgsPerSubject: 3,
})

info, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, info.Config.MaxMsgsPerSubject == 3)
require_True(t, info.State.Msgs == 3)

require_NoError(t, expectMsgs(3))

for i := 0; i < 10; i++ {
sendStreamMsg(t, nc, "one.data", "data")
}

require_NoError(t, expectMsgs(3))
}

0 comments on commit abf5e0b

Please sign in to comment.