Skip to content

Commit

Permalink
Added optimized store NumPending() call.
Browse files Browse the repository at this point in the history
Optimized and fixed a bug in filestore filteredPending().
Optimized memstore FilteredState().

Added comprehensive tests for NumPending() and FilteredState().

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 26, 2023
1 parent 24c2f3b commit daacbf5
Show file tree
Hide file tree
Showing 6 changed files with 760 additions and 216 deletions.
18 changes: 4 additions & 14 deletions server/consumer.go
Expand Up @@ -3501,21 +3501,11 @@ func (o *consumer) streamNumPendingLocked() uint64 {
func (o *consumer) streamNumPending() uint64 {
if o.mset == nil || o.mset.store == nil {
o.npc, o.npf = 0, 0
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
o.npc, o.npf = 0, 0
for _, ss := range o.mset.store.SubjectsState(o.cfg.FilterSubject) {
if o.sseq <= ss.Last {
o.npc++
if ss.Last > o.npf {
// Set our num pending sequence floor.
o.npf = ss.Last
}
}
}
} else {
ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject)
// Set our num pending and sequence floor.
o.npc, o.npf = int64(ss.Msgs), ss.Last
isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject
// Set our num pending and valid sequence floor.
npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject)
o.npc, o.npf = int64(npc), npf
}

return o.numPending()
Expand Down

0 comments on commit daacbf5

Please sign in to comment.