Skip to content

Commit

Permalink
Merge pull request #3901 from nats-io/consumer-create-perf
Browse files Browse the repository at this point in the history
[IMPROVED] Consumer create performance in some circumstances.
  • Loading branch information
derekcollison committed Feb 23, 2023
2 parents f16a7d8 + 2972c11 commit 7aa407a
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 17 deletions.
103 changes: 87 additions & 16 deletions server/filestore.go
Expand Up @@ -27,6 +27,7 @@ import (
"fmt"
"hash"
"io"
"math"
"net"
"os"
"path/filepath"
Expand Down Expand Up @@ -1662,41 +1663,110 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, seq uint64) (t
// FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence.
func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
fs.mu.RLock()
defer fs.mu.RUnlock()

lseq := fs.state.LastSeq
if sseq < fs.state.FirstSeq {
sseq = fs.state.FirstSeq
}
fs.mu.RUnlock()

// Returned state.
var ss SimpleState

// If past the end no results.
if sseq > lseq {
return ss
}

wc := subjectHasWildcard(subj)
// If we want all msgs that match we can shortcircuit.
// TODO(dlc) - This can be extended for all cases but would
// need to be careful on total msgs calculations etc.
if sseq == fs.state.FirstSeq {
fs.numFilteredPending(subj, &ss)
} else {
wc := subjectHasWildcard(subj)
// Tracking subject state.
// TODO(dlc) - Optimize for 2.10 with avl tree and no atomics per block.
for _, mb := range fs.blks {
// Skip blocks that are less than our starting sequence.
if sseq > atomic.LoadUint64(&mb.last.seq) {
continue
}
t, f, l := mb.filteredPending(subj, wc, sseq)
ss.Msgs += t
if ss.First == 0 || (f > 0 && f < ss.First) {
ss.First = f
}
if l > ss.Last {
ss.Last = l
}
}
}

// Tracking subject state.
fs.mu.RLock()
// TODO(dlc) - Optimize for 2.10 with avl tree and no atomics per block.
for _, mb := range fs.blks {
// Skip blocks that are less than our starting sequence.
if sseq > atomic.LoadUint64(&mb.last.seq) {
continue
return ss
}

// Optimized way for getting all num pending matching a filter subject.
// Lock should be held.
func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {
isAll := filter == _EMPTY_ || filter == fwcs

// If isAll we do not need to do anything special to calculate the first and last and total.
if isAll {
ss.First = fs.state.FirstSeq
ss.Last = fs.state.LastSeq
ss.Msgs = fs.state.Msgs
return
}

tsa := [32]string{}
fsa := [32]string{}
fts := tokenizeSubjectIntoSlice(fsa[:0], filter)

start, stop := uint32(math.MaxUint32), uint32(0)
for subj, psi := range fs.psim {
if isAll {
ss.Msgs += psi.total
} else {
tts := tokenizeSubjectIntoSlice(tsa[:0], subj)
if isSubsetMatchTokenized(tts, fts) {
ss.Msgs += psi.total
// Keep track of start and stop indexes for this subject.
if psi.fblk < start {
start = psi.fblk
}
if psi.lblk > stop {
stop = psi.lblk
}
}
}
t, f, l := mb.filteredPending(subj, wc, sseq)
ss.Msgs += t
if ss.First == 0 || (f > 0 && f < ss.First) {
}
// If not collecting all we do need to figure out the first and last sequences.
if !isAll {
wc := subjectHasWildcard(filter)
// Do start
if mb := fs.bim[start]; mb != nil {
_, f, _ := mb.filteredPending(filter, wc, 0)
ss.First = f
} else {
// This is a miss. This can happen since psi.fblk is lazy, but should be very rare.
for i := start + 1; i <= stop; i++ {
mb := fs.bim[i]
if mb == nil {
continue
}
if _, f, _ := mb.filteredPending(filter, wc, 0); f > 0 {
ss.First = f
break
}
}
}
if l > ss.Last {
// Now last
if mb := fs.bim[stop]; mb != nil {
_, _, l := mb.filteredPending(filter, wc, 0)
ss.Last = l
}
}
fs.mu.RUnlock()

return ss
}

// SubjectsState returns a map of SimpleState for all matching subjects.
Expand Down Expand Up @@ -5439,6 +5509,7 @@ func (mb *msgBlock) loadPerSubjectInfo() ([]byte, error) {
if err != nil {
return nil, err
}

if len(buf) < minFileSize || checkHeader(buf) != nil {
return nil, errors.New("short fss state")
}
Expand Down
56 changes: 55 additions & 1 deletion server/norace_test.go
@@ -1,4 +1,4 @@
// Copyright 2018-2022 The NATS Authors
// Copyright 2018-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -6354,3 +6354,57 @@ func TestNoRaceJetStreamKVAccountWithServerRestarts(t *testing.T) {
require_NoError(t, err)
require_True(t, si.State.NumSubjects == uint64(nsubjs))
}

// Test for consumer create when the subject cardinality is high and the
// consumer is filtered with a wildcard that forces linear scans.
// We have an optimization to use in memory structures in filestore to speed up.
// Only if asking to scan all (DeliverAll).
func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"events.>"},
})
require_NoError(t, err)

n := 500_000
msg := bytes.Repeat([]byte("X"), 8*1024)

for i := 0; i < n; i++ {
subj := fmt.Sprintf("events.%d", rand.Intn(100_000))
js.PublishAsync(subj, msg)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
}

// Should stay under 5ms now, but for Travis variability say 25ms.
threshold := 25 * time.Millisecond

start := time.Now()
_, err = js.PullSubscribe("events.*", "dlc")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > threshold {
t.Fatalf("Consumer create took longer than expected, %v vs %v", elapsed, threshold)
}

start = time.Now()
_, err = js.PullSubscribe("events.99999", "xxx")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > threshold {
t.Fatalf("Consumer create took longer than expected, %v vs %v", elapsed, threshold)
}

start = time.Now()
_, err = js.PullSubscribe(">", "zzz")
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > threshold {
t.Fatalf("Consumer create took longer than expected, %v vs %v", elapsed, threshold)
}
}

0 comments on commit 7aa407a

Please sign in to comment.