Skip to content

Commit

Permalink
[IMPROVED] Consumer create time when stream has large number of blks. (
Browse files Browse the repository at this point in the history
…#4269)

When creating a consumer on a stream with a very large number of msg
blks, calculating numPending could be slow.

This aims to optimize a bit, more work to be done on streams with a very
large (> 200k) number of msg blks.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 23, 2023
2 parents 3183f6e + 855e1bb commit 70d8069
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 12 deletions.
41 changes: 29 additions & 12 deletions server/filestore.go
Expand Up @@ -1868,12 +1868,24 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
seqStart, _ = fs.selectMsgBlockWithIndex(sseq)
}

tsa := [32]string{}
fsa := [32]string{}
var tsa, fsa [32]string
fts := tokenizeSubjectIntoSlice(fsa[:0], filter)
isAll := filter == _EMPTY_ || filter == fwcs
wc := subjectHasWildcard(filter)

// See if filter was provided but its the only subject.
if !isAll && !wc && len(fs.psim) == 1 && fs.psim[filter] != nil {
isAll = true
}

// If we are isAll and have no deleted we can do a simpler calculation.
if isAll && (fs.state.LastSeq-fs.state.FirstSeq+1) == fs.state.Msgs {
if sseq == 0 {
return fs.state.Msgs, validThrough
}
return fs.state.LastSeq - sseq + 1, validThrough
}

isMatch := func(subj string) bool {
if isAll {
return true
Expand All @@ -1900,6 +1912,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
var t uint64
if isAll && sseq <= mb.first.seq {
if lastPerSubject {
mb.ensurePerSubjectInfoLoaded()
for subj := range mb.fss {
if !seen[subj] {
total++
Expand Down Expand Up @@ -2023,16 +2036,20 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
mb.mu.Lock()
// Check if we should include all of this block in adjusting. If so work with metadata.
if sseq > mb.last.seq {
// We need to adjust for all matches in this block.
// We will scan fss state vs messages themselves.
// Make sure we have fss loaded.
mb.ensurePerSubjectInfoLoaded()
for subj, ss := range mb.fss {
if isMatch(subj) {
if lastPerSubject {
adjust++
} else {
adjust += ss.Msgs
if isAll && !lastPerSubject {
adjust += mb.msgs
} else {
// We need to adjust for all matches in this block.
// We will scan fss state vs messages themselves.
// Make sure we have fss loaded.
mb.ensurePerSubjectInfoLoaded()
for subj, ss := range mb.fss {
if isMatch(subj) {
if lastPerSubject {
adjust++
} else {
adjust += ss.Msgs
}
}
}
}
Expand Down
42 changes: 42 additions & 0 deletions server/filestore_test.go
Expand Up @@ -5457,3 +5457,45 @@ func TestFileStoreConsumerStoreEncodeAfterRestart(t *testing.T) {
}
})
}

func TestFileStoreNumPendingLargeNumBlks(t *testing.T) {
// No need for all permutations here.
storeDir := t.TempDir()
fcfg := FileStoreConfig{
StoreDir: storeDir,
BlockSize: 128, // Small on purpose to create alot of blks.
}
fs, err := newFileStore(fcfg, StreamConfig{Name: "zzz", Subjects: []string{"zzz"}, Storage: FileStorage})
require_NoError(t, err)

subj, msg := "zzz", bytes.Repeat([]byte("X"), 100)
numMsgs := 10_000

for i := 0; i < numMsgs; i++ {
fs.StoreMsg(subj, nil, msg)
}

start := time.Now()
total, _ := fs.NumPending(4000, "zzz", false)
require_True(t, time.Since(start) < 5*time.Millisecond)
require_True(t, total == 6001)

start = time.Now()
total, _ = fs.NumPending(6000, "zzz", false)
require_True(t, time.Since(start) < 5*time.Millisecond)
require_True(t, total == 4001)

// Now delete a message in first half and second half.
fs.RemoveMsg(1000)
fs.RemoveMsg(9000)

start = time.Now()
total, _ = fs.NumPending(4000, "zzz", false)
require_True(t, time.Since(start) < 50*time.Millisecond)
require_True(t, total == 6000)

start = time.Now()
total, _ = fs.NumPending(6000, "zzz", false)
require_True(t, time.Since(start) < 50*time.Millisecond)
require_True(t, total == 4000)
}

0 comments on commit 70d8069

Please sign in to comment.