Skip to content

Commit

Permalink
[FIXED] #4043 (#4048)
Browse files Browse the repository at this point in the history
Improve performance on storing msgs when multiple subjects exists with
multiple messages and we have store limits that are being hit.

This will not fix any performance degradation per se when per subject
limits are being hit constantly across a vast array of keys. That will
be addressed more so in 2.10.

Resolves #4043 

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 14, 2023
2 parents 3602b38 + b597485 commit 3f8fc5c
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 37 deletions.
96 changes: 72 additions & 24 deletions server/filestore.go
Expand Up @@ -1371,7 +1371,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
// Update fss
// Make sure we have fss loaded.
mb.removeSeqPerSubject(sm.subj, seq, nil)
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
}
// Make sure we have a proper next first sequence.
Expand Down Expand Up @@ -1495,6 +1495,9 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq = mb.last.seq + 1
for _, subj := range subs {
ss := mb.fss[subj]
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
}
Expand Down Expand Up @@ -1600,6 +1603,9 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
var havePartial bool
for subj, ss := range mb.fss {
if isAll || isMatch(subj) {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
update(ss)
} else if sseq <= ss.Last {
Expand Down Expand Up @@ -1797,6 +1803,9 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
mb.ensurePerSubjectInfoLoaded()
for subj, ss := range mb.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
Expand Down Expand Up @@ -1896,6 +1905,9 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
seen[subj] = true
}
} else {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
t += ss.Msgs
} else if sseq <= ss.Last {
Expand Down Expand Up @@ -2487,6 +2499,9 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
info.fblk = i
}
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
return ss.First, nil
}
}
Expand Down Expand Up @@ -2561,6 +2576,9 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss := mb.fss[subj]
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
if ss == nil {
continue
Expand Down Expand Up @@ -2730,7 +2748,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
mb.ensurePerSubjectInfoLoaded()

// If we are tracking multiple subjects here make sure we update that accounting.
mb.removeSeqPerSubject(sm.subj, seq, &smv)
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)

if secure {
Expand Down Expand Up @@ -5107,7 +5125,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
bytes += rl
}
// FSS updates.
mb.removeSeqPerSubject(sm.subj, seq, &smv)
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)

// Check for first message.
Expand Down Expand Up @@ -5315,7 +5333,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
purged++
}
// Update fss
smb.removeSeqPerSubject(sm.subj, mseq, &smv)
smb.removeSeqPerSubject(sm.subj, mseq)
fs.removePerSubject(sm.subj)
}
}
Expand Down Expand Up @@ -5691,7 +5709,7 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {

// Remove a seq from the fss and select new first.
// Lock should be held.
func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg) {
func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {
mb.ensurePerSubjectInfoLoaded()
ss := mb.fss[subj]
if ss == nil {
Expand All @@ -5705,37 +5723,64 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64, smp *StoreMsg)
}

ss.Msgs--
if seq != ss.First {
return
}

// Only one left.
if ss.Msgs == 1 {
if seq != ss.First {
if seq == ss.Last {
ss.Last = ss.First
mb.fssNeedsWrite = true // Mark dirty
} else {
ss.First = ss.Last
mb.fssNeedsWrite = true // Mark dirty
}
mb.fssNeedsWrite = true // Mark dirty
return
}

// Recalculate first.
// TODO(dlc) - Might want to optimize this.
if seq == ss.First {
var smv StoreMsg
if smp == nil {
smp = &smv
}
for tseq := seq + 1; tseq <= ss.Last; tseq++ {
if sm, _ := mb.cacheLookup(tseq, smp); sm != nil {
if sm.subj == subj {
ss.First = tseq
mb.fssNeedsWrite = true // Mark dirty
return
// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}

// Will recalulate the first sequence for this subject in this block.
// Will avoid slower path message lookups and scan the cache directly instead.
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
// Need to make sure messages are loaded.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
}
// Mark first as updated.
ss.firstNeedsUpdate = false
startSeq++

startSlot := int(startSeq - mb.cache.fseq)
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
return
}

var le = binary.LittleEndian
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
li := int(mb.cache.idx[slot]&^hbit) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == string(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq&ebit != 0 {
continue
}
if len(mb.dmap) > 0 {
if _, ok := mb.dmap[seq]; ok {
continue
}
}
ss.First = seq
mb.fssNeedsWrite = true // Mark dirty
return
}
}
}
Expand Down Expand Up @@ -5964,6 +6009,9 @@ func (mb *msgBlock) writePerSubjectInfo() error {
n := binary.PutUvarint(scratch[0:], uint64(len(mb.fss)))
b.Write(scratch[0:n])
for subj, ss := range mb.fss {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
n := binary.PutUvarint(scratch[0:], uint64(len(subj)))
b.Write(scratch[0:n])
b.WriteString(subj)
Expand Down
48 changes: 35 additions & 13 deletions server/memstore.go
@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -79,9 +79,9 @@ func (ms *memStore) UpdateConfig(cfg *StreamConfig) error {
// If the value is smaller we need to enforce that.
if ms.maxp != 0 && ms.maxp < maxp {
lm := uint64(ms.maxp)
for _, ss := range ms.fss {
for subj, ss := range ms.fss {
if ss.Msgs > lm {
ms.enforcePerSubjectLimit(ss)
ms.enforcePerSubjectLimit(subj, ss)
}
}
}
Expand Down Expand Up @@ -125,6 +125,9 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
return ErrMaxBytes
}
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
sm, ok := ms.msgs[ss.First]
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < uint64(len(msg)+len(hdr)) {
return ErrMaxBytes
Expand Down Expand Up @@ -176,7 +179,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
ss.Last = seq
// Check per subject limits.
if ms.maxp > 0 && ss.Msgs > uint64(ms.maxp) {
ms.enforcePerSubjectLimit(ss)
ms.enforcePerSubjectLimit(subj, ss)
}
} else {
ms.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
Expand Down Expand Up @@ -358,6 +361,9 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
// We will track start and end sequences as we go.
for subj, fss := range ms.fss {
if isMatch(subj) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, fss.First, fss)
}
if sseq <= fss.First {
update(fss)
} else if sseq <= fss.Last {
Expand Down Expand Up @@ -452,6 +458,9 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
fss := make(map[string]SimpleState)
for subj, ss := range ms.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
Expand Down Expand Up @@ -503,11 +512,14 @@ func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool)

// Will check the msg limit for this tracked subject.
// Lock should be held.
func (ms *memStore) enforcePerSubjectLimit(ss *SimpleState) {
func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
if ms.maxp <= 0 {
return
}
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if !ms.removeMsg(ss.First, false) {
break
}
Expand Down Expand Up @@ -899,6 +911,9 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
if ss == nil {
continue
}
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss.First < fseq {
fseq = ss.First
}
Expand Down Expand Up @@ -981,19 +996,26 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
return
}
ss.Msgs--
if seq != ss.First {
return
}

// If we know we only have 1 msg left don't need to search for next first.
if ss.Msgs == 1 {
ss.First = ss.Last
return
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
}
} else {
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}
// TODO(dlc) - Might want to optimize this longer term.
for tseq := seq + 1; tseq <= ss.Last; tseq++ {
}

// Will recalulate the first sequence for this subject in this block.
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
for tseq := startSeq + 1; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
break
ss.firstNeedsUpdate = false
return
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions server/store.go
Expand Up @@ -156,6 +156,9 @@ type SimpleState struct {
Msgs uint64 `json:"messages"`
First uint64 `json:"first_seq"`
Last uint64 `json:"last_seq"`

// Internal usage for when the first needs to be updated before use.
firstNeedsUpdate bool
}

// LostStreamData indicates msgs that have been lost.
Expand Down

0 comments on commit 3f8fc5c

Please sign in to comment.