Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] #4043 #4048

Merged
merged 1 commit into from Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
96 changes: 72 additions & 24 deletions server/filestore.go
Expand Up @@ -1358,7 +1358,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
// Update fss
// Make sure we have fss loaded.
mb.removeSeqPerSubject(sm.subj, seq, nil)
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
}
// Make sure we have a proper next first sequence.
Expand Down Expand Up @@ -1482,6 +1482,9 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq = mb.last.seq + 1
for _, subj := range subs {
ss := mb.fss[subj]
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
}
Expand Down Expand Up @@ -1587,6 +1590,9 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
var havePartial bool
for subj, ss := range mb.fss {
if isAll || isMatch(subj) {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
update(ss)
} else if sseq <= ss.Last {
Expand Down Expand Up @@ -1784,6 +1790,9 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
mb.ensurePerSubjectInfoLoaded()
for subj, ss := range mb.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
Expand Down Expand Up @@ -1883,6 +1892,9 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
seen[subj] = true
}
} else {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
t += ss.Msgs
} else if sseq <= ss.Last {
Expand Down Expand Up @@ -2474,6 +2486,9 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
info.fblk = i
}
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
return ss.First, nil
}
}
Expand Down Expand Up @@ -2548,6 +2563,9 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss := mb.fss[subj]
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
if ss == nil {
continue
Expand Down Expand Up @@ -2717,7 +2735,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
mb.ensurePerSubjectInfoLoaded()

// If we are tracking multiple subjects here make sure we update that accounting.
mb.removeSeqPerSubject(sm.subj, seq, &smv)
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)

if secure {
Expand Down Expand Up @@ -5094,7 +5112,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
bytes += rl
}
// FSS updates.
mb.removeSeqPerSubject(sm.subj, seq, &smv)
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)

// Check for first message.
Expand Down Expand Up @@ -5302,7 +5320,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
purged++
}
// Update fss
smb.removeSeqPerSubject(sm.subj, mseq, &smv)
smb.removeSeqPerSubject(sm.subj, mseq)
fs.removePerSubject(sm.subj)
}
}
Expand Down Expand Up @@ -5678,7 +5696,7 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {

// Remove a seq from the fss and select new first.
// Lock should be held.
func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg) {
func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {
mb.ensurePerSubjectInfoLoaded()
ss := mb.fss[subj]
if ss == nil {
Expand All @@ -5692,37 +5710,64 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg)
}

ss.Msgs--
if seq != ss.First {
return
}

// Only one left.
if ss.Msgs == 1 {
if seq != ss.First {
if seq == ss.Last {
ss.Last = ss.First
mb.fssNeedsWrite = true // Mark dirty
} else {
ss.First = ss.Last
mb.fssNeedsWrite = true // Mark dirty
}
mb.fssNeedsWrite = true // Mark dirty
return
}

// Recalculate first.
// TODO(dlc) - Might want to optimize this.
if seq == ss.First {
var smv StoreMsg
if smp == nil {
smp = &smv
}
for tseq := seq + 1; tseq <= ss.Last; tseq++ {
if sm, _ := mb.cacheLookup(tseq, smp); sm != nil {
if sm.subj == subj {
ss.First = tseq
mb.fssNeedsWrite = true // Mark dirty
return
// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}

// Will recalulate the first sequence for this subject in this block.
// Will avoid slower path message lookups and scan the cache directly instead.
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
// Need to make sure messages are loaded.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
}
// Mark first as updated.
ss.firstNeedsUpdate = false
startSeq++

startSlot := int(startSeq - mb.cache.fseq)
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
return
}

var le = binary.LittleEndian
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
li := int(mb.cache.idx[slot]&^hbit) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == string(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq&ebit != 0 {
continue
}
if len(mb.dmap) > 0 {
if _, ok := mb.dmap[seq]; ok {
continue
}
}
ss.First = seq
mb.fssNeedsWrite = true // Mark dirty
return
}
}
}
Expand Down Expand Up @@ -5951,6 +5996,9 @@ func (mb *msgBlock) writePerSubjectInfo() error {
n := binary.PutUvarint(scratch[0:], uint64(len(mb.fss)))
b.Write(scratch[0:n])
for subj, ss := range mb.fss {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
n := binary.PutUvarint(scratch[0:], uint64(len(subj)))
b.Write(scratch[0:n])
b.WriteString(subj)
Expand Down
48 changes: 35 additions & 13 deletions server/memstore.go
@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -79,9 +79,9 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
// If the value is smaller we need to enforce that.
if ms.maxp != 0 && ms.maxp < maxp {
lm := uint64(ms.maxp)
for _, ss := range ms.fss {
for subj, ss := range ms.fss {
if ss.Msgs > lm {
ms.enforcePerSubjectLimit(ss)
ms.enforcePerSubjectLimit(subj, ss)
}
}
}
Expand Down Expand Up @@ -125,6 +125,9 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
return ErrMaxBytes
}
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
sm, ok := ms.msgs[ss.First]
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < uint64(len(msg)+len(hdr)) {
return ErrMaxBytes
Expand Down Expand Up @@ -176,7 +179,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
ss.Last = seq
// Check per subject limits.
if ms.maxp > 0 && ss.Msgs > uint64(ms.maxp) {
ms.enforcePerSubjectLimit(ss)
ms.enforcePerSubjectLimit(subj, ss)
}
} else {
ms.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
Expand Down Expand Up @@ -358,6 +361,9 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
// We will track start and end sequences as we go.
for subj, fss := range ms.fss {
if isMatch(subj) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, fss.First, fss)
}
if sseq <= fss.First {
update(fss)
} else if sseq <= fss.Last {
Expand Down Expand Up @@ -452,6 +458,9 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
fss := make(map[string]SimpleState)
for subj, ss := range ms.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
Expand Down Expand Up @@ -503,11 +512,14 @@ func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool)

// Will check the msg limit for this tracked subject.
// Lock should be held.
func (ms *memStore) enforcePerSubjectLimit(ss *SimpleState) {
func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
if ms.maxp <= 0 {
return
}
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if !ms.removeMsg(ss.First, false) {
break
}
Expand Down Expand Up @@ -899,6 +911,9 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
if ss == nil {
continue
}
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss.First < fseq {
fseq = ss.First
}
Expand Down Expand Up @@ -981,19 +996,26 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
return
}
ss.Msgs--
if seq != ss.First {
return
}

// If we know we only have 1 msg left don't need to search for next first.
if ss.Msgs == 1 {
ss.First = ss.Last
return
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
}
} else {
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}
// TODO(dlc) - Might want to optimize this longer term.
for tseq := seq + 1; tseq <= ss.Last; tseq++ {
}

// Will recalulate the first sequence for this subject in this block.
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
for tseq := startSeq + 1; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
break
ss.firstNeedsUpdate = false
return
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions server/store.go
Expand Up @@ -156,6 +156,9 @@ type SimpleState struct {
Msgs uint64 `json:"messages"`
First uint64 `json:"first_seq"`
Last uint64 `json:"last_seq"`

// Internal usage for when the first needs to be updated before use.
firstNeedsUpdate bool
}

// LostStreamData indicates msgs that have been lost.
Expand Down