Skip to content

Commit

Permalink
Improve consumer create performance.
Browse files Browse the repository at this point in the history
In cases where we had a large subject space, a filestore with many msg blocks, and a filtered consumer with a wildcard filtered subject, creating a consumer could take more memory and time then we wanted.
This improvement works when the consumer is DeliverAll and we used the upper layer in memory psim structure to scan but only in memory and avoid a file read for each msg block.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 23, 2023
1 parent f16a7d8 commit 019b2c8
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 18 deletions.
105 changes: 88 additions & 17 deletions server/filestore.go
Expand Up @@ -27,6 +27,7 @@ import (
"fmt"
"hash"
"io"
"math"
"net"
"os"
"path/filepath"
Expand Down Expand Up @@ -1662,41 +1663,110 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, seq uint64) (t
// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence.
func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
fs.mu.RLock()
defer fs.mu.RUnlock()

lseq := fs.state.LastSeq
if sseq < fs.state.FirstSeq {
sseq = fs.state.FirstSeq
}
fs.mu.RUnlock()

// Returned state.
var ss SimpleState

// If past the end no results.
if sseq > lseq {
return ss
}

wc := subjectHasWildcard(subj)
// If we want all msgs that match we can shortcircuit.
// TODO(dlc) - This can be extended for all cases but would
// need to be careful on total msgs calculations etc.
if sseq == fs.state.FirstSeq {
fs.numFilteredPending(subj, &ss)
} else {
wc := subjectHasWildcard(subj)
// Tracking subject state.
// TODO(dlc) - Optimize for 2.10 with avl tree and no atomics per block.
for _, mb := range fs.blks {
// Skip blocks that are less than our starting sequence.
if sseq > atomic.LoadUint64(&mb.last.seq) {
continue
}
t, f, l := mb.filteredPending(subj, wc, sseq)
ss.Msgs += t
if ss.First == 0 || (f > 0 && f < ss.First) {
ss.First = f
}
if l > ss.Last {
ss.Last = l
}
}
}

// Tracking subject state.
fs.mu.RLock()
// TODO(dlc) - Optimize for 2.10 with avl tree and no atomics per block.
for _, mb := range fs.blks {
// Skip blocks that are less than our starting sequence.
if sseq > atomic.LoadUint64(&mb.last.seq) {
continue
return ss
}

// Optimized way for getting all num pending matching a filter subject.
// Lock should be held.
func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {
isAll := filter == _EMPTY_ || filter == fwcs

// If isAll we do not need to do anything special to calculate the first and last and total.
if isAll {
ss.First = fs.state.FirstSeq
ss.Last = fs.state.LastSeq
ss.Msgs = fs.state.Msgs
return
}

tsa := [32]string{}
fsa := [32]string{}
fts := tokenizeSubjectIntoSlice(fsa[:0], filter)

start, stop := uint32(math.MaxUint32), uint32(0)
for subj, psi := range fs.psim {
if isAll {
ss.Msgs += psi.total
} else {
tts := tokenizeSubjectIntoSlice(tsa[:0], subj)
if isSubsetMatchTokenized(tts, fts) {
ss.Msgs += psi.total
// Keep track of start and stop indexes for this subject.
if psi.fblk < start {
start = psi.fblk
}
if psi.lblk > stop {
stop = psi.lblk
}
}
}
t, f, l := mb.filteredPending(subj, wc, sseq)
ss.Msgs += t
if ss.First == 0 || (f > 0 && f < ss.First) {
}
// If not collecting all we do need to figure out the first and last sequences.
if !isAll {
wc := subjectHasWildcard(filter)
// Do start
if mb := fs.bim[start]; mb != nil {
_, f, _ := mb.filteredPending(filter, wc, 0)
ss.First = f
} else {
// This is a miss. This can happen since psi.fblk is lazy, but should be very rare.
for i := start + 1; i <= stop; i++ {
mb := fs.bim[i]
if mb == nil {
continue
}
if _, f, _ := mb.filteredPending(filter, wc, 0); f > 0 {
ss.First = f
break
}
}
}
if l > ss.Last {
// Now last
if mb := fs.bim[stop]; mb != nil {
_, _, l := mb.filteredPending(filter, wc, 0)
ss.Last = l
}
}
fs.mu.RUnlock()

return ss
}

// SubjectsState returns a map of SimpleState for all matching subjects.
Expand All @@ -1709,7 +1779,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
}

start, stop := fs.blks[0], fs.lmb
// We can short circuit if not a wildcard using psim for start and stop.
// We can short circuit if not a wildcard using psim1 for start and stop.
if !subjectHasWildcard(subject) {
info := fs.psim[subject]
if info == nil {
Expand Down Expand Up @@ -5439,6 +5509,7 @@ func (mb *msgBlock) loadPerSubjectInfo() ([]byte, error) {
if err != nil {
return nil, err
}

if len(buf) < minFileSize || checkHeader(buf) != nil {
return nil, errors.New("short fss state")
}
Expand Down
56 changes: 55 additions & 1 deletion server/norace_test.go
@@ -1,4 +1,4 @@
// Copyright 2018-2022 The NATS Authors
// Copyright 2018-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -6354,3 +6354,57 @@ func TestNoRaceJetStreamKVAccountWithServerRestarts(t *testing.T) {
require_NoError(t, err)
require_True(t, si.State.NumSubjects == uint64(nsubjs))
}

// Test for consumer create when the subject cardinality is high and the
// consumer is filtered with a wildcard that forces linear scans.
// We have an optimization to use in memory structures in filestore to speed up.
// Only if asking to scan all (DeliverAll).
func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"events.>"},
})
require_NoError(t, err)

n := 500_000
msg := bytes.Repeat([]byte("X"), 8*1024)

for i := 0; i < n; i++ {
subj := fmt.Sprintf("events.%d", rand.Intn(100_000))
js.PublishAsync(subj, msg)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
}

// Should stay under 5ms now, but for Travis variability say 25ms.
threshold := 25 * time.Millisecond

start := time.Now()
_, err = js.PullSubscribe("events.*", "dlc")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > threshold {
t.Fatalf("Consumer create took longer than expected, %v vs %v", elapsed, threshold)
}

start = time.Now()
_, err = js.PullSubscribe("events.99999", "xxx")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > threshold {
t.Fatalf("Consumer create took longer than expected, %v vs %v", elapsed, threshold)
}

start = time.Now()
_, err = js.PullSubscribe(">", "zzz")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > threshold {
t.Fatalf("Consumer create took longer than expected, %v vs %v", elapsed, threshold)
}
}

0 comments on commit 019b2c8

Please sign in to comment.