Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] ConsumerInfo request processing. #3877

Merged
merged 1 commit into from Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 70 additions & 23 deletions server/consumer.go
Expand Up @@ -242,8 +242,8 @@ type consumer struct {
dseq uint64
adflr uint64
asflr uint64
npc uint64
npcm uint64
npc int64
npf uint64
dsubj string
qgroup string
lss *lastSeqSkipList
Expand Down Expand Up @@ -983,6 +983,9 @@ func (o *consumer) setLeader(isLeader bool) {
s, jsa, stream, lseq := mset.srv, mset.jsa, mset.cfg.Name, mset.lseq
mset.mu.RUnlock()

// Register as a leader with our parent stream.
mset.setConsumerAsLeader(o)

o.mu.Lock()
o.rdq, o.rdqi = nil, nil

Expand Down Expand Up @@ -1075,9 +1078,6 @@ func (o *consumer) setLeader(isLeader bool) {
// Snapshot initial info.
o.infoWithSnap(true)

// Register as a leader with our parent stream.
mset.setConsumerAsLeader(o)

// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs(qch)

Expand Down Expand Up @@ -1617,6 +1617,9 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
// Allowed but considered no-op, [Description, SampleFrequency, MaxWaiting, HeadersOnly]
o.cfg = *cfg

// Re-calculate num pending on update.
o.streamNumPending()

return nil
}

Expand Down Expand Up @@ -2259,7 +2262,7 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
},
NumAckPending: len(o.pending),
NumRedelivered: len(o.rdc),
NumPending: o.streamNumPending(),
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
}
// Adjust active based on non-zero etc. Also make UTC here.
Expand Down Expand Up @@ -3262,6 +3265,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {

// On error either wait or return.
if err != nil || pmsg == nil {
// On EOF we can optionally fast sync num pending state.
if err == ErrStoreEOF {
o.checkNumPendingOnEOF()
}
if err == ErrStoreMsgNotFound || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache {
goto waitForMsgs
} else {
Expand All @@ -3271,7 +3278,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}

// Update our cached num pending here first.
if dc == 1 && o.npcm > 0 {
if dc == 1 {
o.npc--
}
// Pre-calculate ackReply
Expand All @@ -3298,7 +3305,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
} else {
// We will redo this one.
o.sseq--
if dc == 1 && o.npcm > 0 {
if dc == 1 {
o.npc++
}
pmsg.returnToPool()
Expand Down Expand Up @@ -3425,39 +3432,77 @@ func (o *consumer) setMaxPendingBytes(limit int) {
}
}

// Does some sanity checks to see if we should re-calculate.
// Since there is a race when decrementing when there is contention at the beginning of the stream.
// The race is a getNextMsg skips a deleted msg, and then the decStreamPending call fires.
// This does some quick sanity checks to see if we should re-calculate num pending.
// Lock should be held.
func (o *consumer) numPending() uint64 {
if o.npcm == 0 {
o.streamNumPending()
func (o *consumer) checkNumPending() uint64 {
if o.mset != nil {
var state StreamState
o.mset.store.FastState(&state)
if o.sseq > state.LastSeq && o.npc != 0 || o.npc > int64(state.Msgs) {
// Re-calculate.
o.streamNumPending()
}
}
// This can wrap based on possibly having a dec before the inc. Account for that here.
if o.npc&(1<<63) != 0 {
return o.numPending()
}

// Lock should be held.
func (o *consumer) numPending() uint64 {
if o.npc < 0 {
return 0
}
return o.npc
return uint64(o.npc)
}

// This will do a quick sanity check on num pending when we encounter
// and EOF in the loop and gather.
// Lock should be held.
func (o *consumer) checkNumPendingOnEOF() {
if o.mset == nil {
return
}
var state StreamState
o.mset.store.FastState(&state)
if o.sseq > state.LastSeq && o.npc != 0 {
// We know here we can reset our running state for num pending.
o.npc, o.npf = 0, state.LastSeq
}
}

// Call into streamNumPending after acquiring the consumer lock.
func (o *consumer) streamNumPendingLocked() uint64 {
o.mu.Lock()
defer o.mu.Unlock()
return o.streamNumPending()
}

// Will force a set from the stream store of num pending.
// Depends on delivery policy, for last per subject we calculate differently.
// Lock should be held.
func (o *consumer) streamNumPending() uint64 {
if o.mset == nil || o.mset.store == nil {
o.npc, o.npcm = 0, 0
o.npc, o.npf = 0, 0
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
o.npc, o.npcm = 0, 0
o.npc, o.npf = 0, 0
for _, ss := range o.mset.store.SubjectsState(o.cfg.FilterSubject) {
if o.sseq <= ss.Last {
o.npc++
if ss.Last > o.npcm {
o.npcm = ss.Last
if ss.Last > o.npf {
// Set our num pending sequence floor.
o.npf = ss.Last
}
}
}
} else {
ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject)
o.npc, o.npcm = ss.Msgs, ss.Last
// Set our num pending and sequence floor.
o.npc, o.npf = int64(ss.Msgs), ss.Last
}
return o.npc

return o.numPending()
}

func convertToHeadersOnly(pmsg *jsPubMsg) {
Expand Down Expand Up @@ -4330,10 +4375,11 @@ func (o *consumer) requestNextMsgSubject() string {

func (o *consumer) decStreamPending(sseq uint64, subj string) {
o.mu.Lock()
// Update our cached num pending. Only do so if we think deliverMsg has not done so.
if sseq > o.npcm && sseq >= o.sseq && o.isFilteredMatch(subj) {
// Update our cached num pending only if we think deliverMsg has not done so.
if sseq >= o.sseq && o.isFilteredMatch(subj) {
o.npc--
}

// Check if this message was pending.
p, wasPending := o.pending[sseq]
var rdc uint64 = 1
Expand All @@ -4346,6 +4392,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {
// TODO(dlc) - we could do a term here instead with a reason to generate the advisory.
if wasPending {
// We could have lock for stream so do this in a go routine.
// TODO(dlc) - We should do this with ipq vs naked go routines.
go o.processTerm(sseq, p.Sequence, rdc)
}
}
Expand Down Expand Up @@ -4387,7 +4434,7 @@ func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, s
if o.mset == nil {
return
}
if seq > o.npcm {
if seq > o.npf {
o.npc++
}
if seq < o.sseq {
Expand Down
1 change: 1 addition & 0 deletions server/filestore.go
Expand Up @@ -1679,6 +1679,7 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {

// Tracking subject state.
fs.mu.RLock()
// TODO(dlc) - Optimize for 2.10 with avl tree and no atomics per block.
for _, mb := range fs.blks {
// Skip blocks that are less than our starting sequence.
if sseq > atomic.LoadUint64(&mb.last.seq) {
Expand Down
56 changes: 56 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -19486,3 +19486,59 @@ func TestJetStreamPurgeExAndAccounting(t *testing.T) {
}
}
}

func TestJetStreamRollup(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

const STREAM = "S"
const SUBJ = "S.*"

js.AddStream(&nats.StreamConfig{
Name: STREAM,
Subjects: []string{SUBJ},
AllowRollup: true,
})

for i := 1; i <= 10; i++ {
sendStreamMsg(t, nc, "S.A", fmt.Sprintf("%v", i))
sendStreamMsg(t, nc, "S.B", fmt.Sprintf("%v", i))
}

sinfo, err := js.StreamInfo(STREAM)
require_NoError(t, err)
require_True(t, sinfo.State.Msgs == 20)

cinfo, err := js.AddConsumer(STREAM, &nats.ConsumerConfig{
Durable: "DUR-A",
FilterSubject: "S.A",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
require_True(t, cinfo.NumPending == 10)

m := nats.NewMsg("S.A")
m.Header.Set(JSMsgRollup, JSMsgRollupSubject)

_, err = js.PublishMsg(m)
require_NoError(t, err)

cinfo, err = js.ConsumerInfo("S", "DUR-A")
require_NoError(t, err)
require_True(t, cinfo.NumPending == 1)

sinfo, err = js.StreamInfo(STREAM)
require_NoError(t, err)
require_True(t, sinfo.State.Msgs == 11)

cinfo, err = js.AddConsumer(STREAM, &nats.ConsumerConfig{
Durable: "DUR-B",
FilterSubject: "S.B",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
require_True(t, cinfo.NumPending == 10)
}
73 changes: 73 additions & 0 deletions server/norace_test.go
Expand Up @@ -6200,3 +6200,76 @@ func TestNoRaceFileStoreStreamMaxAgePerformance(t *testing.T) {
fmt.Printf("Took %v to store %d\n", elapsed, num)
fmt.Printf("%.0f msgs/sec\n", float64(num)/elapsed.Seconds())
}

// ConsumerInfo seems to being called quite a bit more than we had anticipated.
// Under certain circumstances, since we reset num pending, this can be very costly.
// We will use the fast path to alleviate that performance bottleneck but also make
// sure we are still being accurate.
func TestNoRaceJetStreamClusterConsumerInfoSpeed(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

c.waitOnLeader()
server := c.randomNonLeader()

nc, js := jsClientConnect(t, server)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"events.>"},
Replicas: 3,
})
require_NoError(t, err)

// The issue is compounded when we have lots of different subjects captured
// by a terminal fwc. The consumer will have a terminal pwc.
// Here make all subjects unique.

sub, err := js.PullSubscribe("events.*", "DLC")
require_NoError(t, err)

toSend := 250_000
for i := 0; i < toSend; i++ {
subj := fmt.Sprintf("events.%d", i+1)
js.PublishAsync(subj, []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

checkNumPending := func(expected int) {
t.Helper()
start := time.Now()
ci, err := js.ConsumerInfo("TEST", "DLC")
require_NoError(t, err)
// Make sure these are fast now.
if elapsed := time.Since(start); elapsed > 5*time.Millisecond {
t.Fatalf("ConsumerInfo took too long: %v", elapsed)
}
// Make sure pending == expected.
if ci.NumPending != uint64(expected) {
t.Fatalf("Expected %d NumPending, got %d", expected, ci.NumPending)
}
}
// Make sure in simple case it is correct.
checkNumPending(toSend)

// Do a few acks.
toAck := 25
for _, m := range fetchMsgs(t, sub, 25, time.Second) {
err = m.AckSync()
require_NoError(t, err)
}
checkNumPending(toSend - toAck)

// Now do a purge such that we only keep so many.
// We want to make sure we do the right thing here and have correct calculations.
toKeep := 100_000
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Keep: uint64(toKeep)})
require_NoError(t, err)

checkNumPending(toKeep)
}
10 changes: 9 additions & 1 deletion server/stream.go
Expand Up @@ -3321,13 +3321,21 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
func (mset *stream) storeUpdates(md, bd int64, seq uint64, subj string) {
// If we have a single negative update then we will process our consumers for stream pending.
// Purge and Store handled separately inside individual calls.
if md == -1 && seq > 0 {
if md == -1 && seq > 0 && subj != _EMPTY_ {
// We use our consumer list mutex here instead of the main stream lock since it may be held already.
mset.clsMu.RLock()
// TODO(dlc) - Do sublist like signaling so we do not have to match?
for _, o := range mset.cList {
o.decStreamPending(seq, subj)
}
mset.clsMu.RUnlock()
} else if md < 0 {
// Batch decrements we need to force consumers to re-calculate num pending.
mset.clsMu.RLock()
for _, o := range mset.cList {
o.streamNumPendingLocked()
}
mset.clsMu.RUnlock()
}

if mset.jsa != nil {
Expand Down