Skip to content

Commit

Permalink
Use AVL tree for consumer redeliver map (#4071)
Browse files Browse the repository at this point in the history
This also uses the AVL tree from #4070 in place of the map that
currently tracks whether or not a sequence is present in the consumer
redeliveries.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Apr 18, 2023
2 parents f3e0d6b + 57d888e commit 1a329c7
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions server/consumer.go
Expand Up @@ -28,6 +28,7 @@ import (
"sync/atomic"
"time"

"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nuid"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -274,7 +275,7 @@ type consumer struct {
pending map[uint64]*Pending
ptmr *time.Timer
rdq []uint64
rdqi map[uint64]struct{}
rdqi avl.SequenceSet
rdc map[uint64]uint64
maxdc uint64
waiting *waitQueue
Expand Down Expand Up @@ -1077,7 +1078,8 @@ func (o *consumer) setLeader(isLeader bool) {
mset.setConsumerAsLeader(o)

o.mu.Lock()
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()

// Restore our saved state. During non-leader status we just update our underlying store.
o.readStoredState(lseq)
Expand Down Expand Up @@ -1200,7 +1202,8 @@ func (o *consumer) setLeader(isLeader bool) {
}
// Make sure to clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
// ok if they are nil, we protect inside unsubscribe()
o.unsubscribe(o.ackSub)
Expand Down Expand Up @@ -4055,12 +4058,9 @@ func (o *consumer) didNotDeliver(seq uint64) {

// Lock should be held.
func (o *consumer) addToRedeliverQueue(seqs ...uint64) {
if o.rdqi == nil {
o.rdqi = make(map[uint64]struct{})
}
o.rdq = append(o.rdq, seqs...)
for _, seq := range seqs {
o.rdqi[seq] = struct{}{}
o.rdqi.Insert(seq)
}
}

Expand All @@ -4075,10 +4075,11 @@ func (o *consumer) getNextToRedeliver() uint64 {
}
seq := o.rdq[0]
if len(o.rdq) == 1 {
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
} else {
o.rdq = append(o.rdq[:0], o.rdq[1:]...)
delete(o.rdqi, seq)
o.rdqi.Delete(seq)
}
return seq
}
Expand All @@ -4087,11 +4088,7 @@ func (o *consumer) getNextToRedeliver() uint64 {
// FIXME(dlc) - This is O(n) but should be fast with small redeliver size.
// Lock should be held.
func (o *consumer) onRedeliverQueue(seq uint64) bool {
if o.rdqi == nil {
return false
}
_, ok := o.rdqi[seq]
return ok
return o.rdqi.Exists(seq)
}

// Remove a sequence from the redelivery queue.
Expand All @@ -4103,10 +4100,11 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool {
for i, rseq := range o.rdq {
if rseq == seq {
if len(o.rdq) == 1 {
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
} else {
o.rdq = append(o.rdq[:i], o.rdq[i+1:]...)
delete(o.rdqi, seq)
o.rdqi.Delete(seq)
}
return true
}
Expand Down Expand Up @@ -4218,7 +4216,8 @@ func (o *consumer) checkPending() {
} else {
// Make sure to stop timer and clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
}

Expand Down Expand Up @@ -4553,7 +4552,8 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
// We need to remove all those being queued for redelivery under o.rdq
if len(o.rdq) > 0 {
rdq := o.rdq
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
for _, sseq := range rdq {
if sseq >= o.sseq {
o.addToRedeliverQueue(sseq)
Expand Down

0 comments on commit 1a329c7

Please sign in to comment.