Skip to content

Commit

Permalink
Merge pull request #3910 from nats-io/store-speedups
Browse files Browse the repository at this point in the history
[FIXED] JetStream Store fixes and speedups.
  • Loading branch information
derekcollison committed Feb 26, 2023
2 parents 87a1846 + daacbf5 commit 65127d6
Show file tree
Hide file tree
Showing 7 changed files with 944 additions and 180 deletions.
18 changes: 4 additions & 14 deletions server/consumer.go
Expand Up @@ -3501,21 +3501,11 @@ func (o *consumer) streamNumPendingLocked() uint64 {
func (o *consumer) streamNumPending() uint64 {
if o.mset == nil || o.mset.store == nil {
o.npc, o.npf = 0, 0
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
o.npc, o.npf = 0, 0
for _, ss := range o.mset.store.SubjectsState(o.cfg.FilterSubject) {
if o.sseq <= ss.Last {
o.npc++
if ss.Last > o.npf {
// Set our num pending sequence floor.
o.npf = ss.Last
}
}
}
} else {
ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject)
// Set our num pending and sequence floor.
o.npc, o.npf = int64(ss.Msgs), ss.Last
isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject
// Set our num pending and valid sequence floor.
npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject)
o.npc, o.npf = int64(npc), npf
}

return o.numPending()
Expand Down

0 comments on commit 65127d6

Please sign in to comment.