Skip to content

Commit

Permalink
Merge pull request #3877 from nats-io/consumer-info-optimization
Browse files Browse the repository at this point in the history
[IMPROVED] ConsumerInfo request processing.
  • Loading branch information
derekcollison committed Feb 17, 2023
2 parents b91143a + 11b0f21 commit 52e5995
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 24 deletions.
93 changes: 70 additions & 23 deletions server/consumer.go
Expand Up @@ -242,8 +242,8 @@ type consumer struct {
dseq uint64
adflr uint64
asflr uint64
npc uint64
npcm uint64
npc int64
npf uint64
dsubj string
qgroup string
lss *lastSeqSkipList
Expand Down Expand Up @@ -983,6 +983,9 @@ func (o *consumer) setLeader(isLeader bool) {
s, jsa, stream, lseq := mset.srv, mset.jsa, mset.cfg.Name, mset.lseq
mset.mu.RUnlock()

// Register as a leader with our parent stream.
mset.setConsumerAsLeader(o)

o.mu.Lock()
o.rdq, o.rdqi = nil, nil

Expand Down Expand Up @@ -1075,9 +1078,6 @@ func (o *consumer) setLeader(isLeader bool) {
// Snapshot initial info.
o.infoWithSnap(true)

// Register as a leader with our parent stream.
mset.setConsumerAsLeader(o)

// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs(qch)

Expand Down Expand Up @@ -1617,6 +1617,9 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
// Allowed but considered no-op, [Description, SampleFrequency, MaxWaiting, HeadersOnly]
o.cfg = *cfg

// Re-calculate num pending on update.
o.streamNumPending()

return nil
}

Expand Down Expand Up @@ -2259,7 +2262,7 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
},
NumAckPending: len(o.pending),
NumRedelivered: len(o.rdc),
NumPending: o.streamNumPending(),
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
}
// Adjust active based on non-zero etc. Also make UTC here.
Expand Down Expand Up @@ -3262,6 +3265,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {

// On error either wait or return.
if err != nil || pmsg == nil {
// On EOF we can optionally fast sync num pending state.
if err == ErrStoreEOF {
o.checkNumPendingOnEOF()
}
if err == ErrStoreMsgNotFound || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache {
goto waitForMsgs
} else {
Expand All @@ -3271,7 +3278,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}

// Update our cached num pending here first.
if dc == 1 && o.npcm > 0 {
if dc == 1 {
o.npc--
}
// Pre-calculate ackReply
Expand All @@ -3298,7 +3305,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
} else {
// We will redo this one.
o.sseq--
if dc == 1 && o.npcm > 0 {
if dc == 1 {
o.npc++
}
pmsg.returnToPool()
Expand Down Expand Up @@ -3425,39 +3432,77 @@ func (o *consumer) setMaxPendingBytes(limit int) {
}
}

// Does some sanity checks to see if we should re-calculate.
// Since there is a race when decrementing when there is contention at the beginning of the stream.
// The race is a getNextMsg skips a deleted msg, and then the decStreamPending call fires.
// This does some quick sanity checks to see if we should re-calculate num pending.
// Lock should be held.
func (o *consumer) numPending() uint64 {
if o.npcm == 0 {
o.streamNumPending()
func (o *consumer) checkNumPending() uint64 {
if o.mset != nil {
var state StreamState
o.mset.store.FastState(&state)
if o.sseq > state.LastSeq && o.npc != 0 || o.npc > int64(state.Msgs) {
// Re-calculate.
o.streamNumPending()
}
}
// This can wrap based on possibly having a dec before the inc. Account for that here.
if o.npc&(1<<63) != 0 {
return o.numPending()
}

// Lock should be held.
func (o *consumer) numPending() uint64 {
if o.npc < 0 {
return 0
}
return o.npc
return uint64(o.npc)
}

// This will do a quick sanity check on num pending when we encounter
// and EOF in the loop and gather.
// Lock should be held.
func (o *consumer) checkNumPendingOnEOF() {
if o.mset == nil {
return
}
var state StreamState
o.mset.store.FastState(&state)
if o.sseq > state.LastSeq && o.npc != 0 {
// We know here we can reset our running state for num pending.
o.npc, o.npf = 0, state.LastSeq
}
}

// Call into streamNumPending after acquiring the consumer lock.
func (o *consumer) streamNumPendingLocked() uint64 {
o.mu.Lock()
defer o.mu.Unlock()
return o.streamNumPending()
}

// Will force a set from the stream store of num pending.
// Depends on delivery policy, for last per subject we calculate differently.
// Lock should be held.
func (o *consumer) streamNumPending() uint64 {
if o.mset == nil || o.mset.store == nil {
o.npc, o.npcm = 0, 0
o.npc, o.npf = 0, 0
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
o.npc, o.npcm = 0, 0
o.npc, o.npf = 0, 0
for _, ss := range o.mset.store.SubjectsState(o.cfg.FilterSubject) {
if o.sseq <= ss.Last {
o.npc++
if ss.Last > o.npcm {
o.npcm = ss.Last
if ss.Last > o.npf {
// Set our num pending sequence floor.
o.npf = ss.Last
}
}
}
} else {
ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject)
o.npc, o.npcm = ss.Msgs, ss.Last
// Set our num pending and sequence floor.
o.npc, o.npf = int64(ss.Msgs), ss.Last
}
return o.npc

return o.numPending()
}

func convertToHeadersOnly(pmsg *jsPubMsg) {
Expand Down Expand Up @@ -4330,10 +4375,11 @@ func (o *consumer) requestNextMsgSubject() string {

func (o *consumer) decStreamPending(sseq uint64, subj string) {
o.mu.Lock()
// Update our cached num pending. Only do so if we think deliverMsg has not done so.
if sseq > o.npcm && sseq >= o.sseq && o.isFilteredMatch(subj) {
// Update our cached num pending only if we think deliverMsg has not done so.
if sseq >= o.sseq && o.isFilteredMatch(subj) {
o.npc--
}

// Check if this message was pending.
p, wasPending := o.pending[sseq]
var rdc uint64 = 1
Expand All @@ -4346,6 +4392,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {
// TODO(dlc) - we could do a term here instead with a reason to generate the advisory.
if wasPending {
// We could have lock for stream so do this in a go routine.
// TODO(dlc) - We should do this with ipq vs naked go routines.
go o.processTerm(sseq, p.Sequence, rdc)
}
}
Expand Down Expand Up @@ -4387,7 +4434,7 @@ func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, s
if o.mset == nil {
return
}
if seq > o.npcm {
if seq > o.npf {
o.npc++
}
if seq < o.sseq {
Expand Down
1 change: 1 addition & 0 deletions server/filestore.go
Expand Up @@ -1679,6 +1679,7 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {

// Tracking subject state.
fs.mu.RLock()
// TODO(dlc) - Optimize for 2.10 with avl tree and no atomics per block.
for _, mb := range fs.blks {
// Skip blocks that are less than our starting sequence.
if sseq > atomic.LoadUint64(&mb.last.seq) {
Expand Down
56 changes: 56 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -19486,3 +19486,59 @@ func TestJetStreamPurgeExAndAccounting(t *testing.T) {
}
}
}

func TestJetStreamRollup(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

const STREAM = "S"
const SUBJ = "S.*"

js.AddStream(&nats.StreamConfig{
Name: STREAM,
Subjects: []string{SUBJ},
AllowRollup: true,
})

for i := 1; i <= 10; i++ {
sendStreamMsg(t, nc, "S.A", fmt.Sprintf("%v", i))
sendStreamMsg(t, nc, "S.B", fmt.Sprintf("%v", i))
}

sinfo, err := js.StreamInfo(STREAM)
require_NoError(t, err)
require_True(t, sinfo.State.Msgs == 20)

cinfo, err := js.AddConsumer(STREAM, &nats.ConsumerConfig{
Durable: "DUR-A",
FilterSubject: "S.A",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
require_True(t, cinfo.NumPending == 10)

m := nats.NewMsg("S.A")
m.Header.Set(JSMsgRollup, JSMsgRollupSubject)

_, err = js.PublishMsg(m)
require_NoError(t, err)

cinfo, err = js.ConsumerInfo("S", "DUR-A")
require_NoError(t, err)
require_True(t, cinfo.NumPending == 1)

sinfo, err = js.StreamInfo(STREAM)
require_NoError(t, err)
require_True(t, sinfo.State.Msgs == 11)

cinfo, err = js.AddConsumer(STREAM, &nats.ConsumerConfig{
Durable: "DUR-B",
FilterSubject: "S.B",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
require_True(t, cinfo.NumPending == 10)
}
73 changes: 73 additions & 0 deletions server/norace_test.go
Expand Up @@ -6200,3 +6200,76 @@ func TestNoRaceFileStoreStreamMaxAgePerformance(t *testing.T) {
fmt.Printf("Took %v to store %d\n", elapsed, num)
fmt.Printf("%.0f msgs/sec\n", float64(num)/elapsed.Seconds())
}

// ConsumerInfo seems to being called quite a bit more than we had anticipated.
// Under certain circumstances, since we reset num pending, this can be very costly.
// We will use the fast path to alleviate that performance bottleneck but also make
// sure we are still being accurate.
func TestNoRaceJetStreamClusterConsumerInfoSpeed(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

c.waitOnLeader()
server := c.randomNonLeader()

nc, js := jsClientConnect(t, server)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"events.>"},
Replicas: 3,
})
require_NoError(t, err)

// The issue is compounded when we have lots of different subjects captured
// by a terminal fwc. The consumer will have a terminal pwc.
// Here make all subjects unique.

sub, err := js.PullSubscribe("events.*", "DLC")
require_NoError(t, err)

toSend := 250_000
for i := 0; i < toSend; i++ {
subj := fmt.Sprintf("events.%d", i+1)
js.PublishAsync(subj, []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

checkNumPending := func(expected int) {
t.Helper()
start := time.Now()
ci, err := js.ConsumerInfo("TEST", "DLC")
require_NoError(t, err)
// Make sure these are fast now.
if elapsed := time.Since(start); elapsed > 5*time.Millisecond {
t.Fatalf("ConsumerInfo took too long: %v", elapsed)
}
// Make sure pending == expected.
if ci.NumPending != uint64(expected) {
t.Fatalf("Expected %d NumPending, got %d", expected, ci.NumPending)
}
}
// Make sure in simple case it is correct.
checkNumPending(toSend)

// Do a few acks.
toAck := 25
for _, m := range fetchMsgs(t, sub, 25, time.Second) {
err = m.AckSync()
require_NoError(t, err)
}
checkNumPending(toSend - toAck)

// Now do a purge such that we only keep so many.
// We want to make sure we do the right thing here and have correct calculations.
toKeep := 100_000
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Keep: uint64(toKeep)})
require_NoError(t, err)

checkNumPending(toKeep)
}
10 changes: 9 additions & 1 deletion server/stream.go
Expand Up @@ -3321,13 +3321,21 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
func (mset *stream) storeUpdates(md, bd int64, seq uint64, subj string) {
// If we have a single negative update then we will process our consumers for stream pending.
// Purge and Store handled separately inside individual calls.
if md == -1 && seq > 0 {
if md == -1 && seq > 0 && subj != _EMPTY_ {
// We use our consumer list mutex here instead of the main stream lock since it may be held already.
mset.clsMu.RLock()
// TODO(dlc) - Do sublist like signaling so we do not have to match?
for _, o := range mset.cList {
o.decStreamPending(seq, subj)
}
mset.clsMu.RUnlock()
} else if md < 0 {
// Batch decrements we need to force consumers to re-calculate num pending.
mset.clsMu.RLock()
for _, o := range mset.cList {
o.streamNumPendingLocked()
}
mset.clsMu.RUnlock()
}

if mset.jsa != nil {
Expand Down

0 comments on commit 52e5995

Please sign in to comment.