Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AVL tree for consumer redeliver map #4071

Merged
merged 1 commit into from Apr 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 18 additions & 18 deletions server/consumer.go
Expand Up @@ -28,6 +28,7 @@ import (
"sync/atomic"
"time"

"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nuid"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -274,7 +275,7 @@ type consumer struct {
pending map[uint64]*Pending
ptmr *time.Timer
rdq []uint64
rdqi map[uint64]struct{}
rdqi avl.SequenceSet
rdc map[uint64]uint64
maxdc uint64
waiting *waitQueue
Expand Down Expand Up @@ -1077,7 +1078,8 @@ func (o *consumer) setLeader(isLeader bool) {
mset.setConsumerAsLeader(o)

o.mu.Lock()
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()

// Restore our saved state. During non-leader status we just update our underlying store.
o.readStoredState(lseq)
Expand Down Expand Up @@ -1200,7 +1202,8 @@ func (o *consumer) setLeader(isLeader bool) {
}
// Make sure to clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
// ok if they are nil, we protect inside unsubscribe()
o.unsubscribe(o.ackSub)
Expand Down Expand Up @@ -4055,12 +4058,9 @@ func (o *consumer) didNotDeliver(seq uint64) {

// Lock should be held.
func (o *consumer) addToRedeliverQueue(seqs ...uint64) {
if o.rdqi == nil {
o.rdqi = make(map[uint64]struct{})
}
o.rdq = append(o.rdq, seqs...)
for _, seq := range seqs {
o.rdqi[seq] = struct{}{}
o.rdqi.Insert(seq)
}
}

Expand All @@ -4075,10 +4075,11 @@ func (o *consumer) getNextToRedeliver() uint64 {
}
seq := o.rdq[0]
if len(o.rdq) == 1 {
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
} else {
o.rdq = append(o.rdq[:0], o.rdq[1:]...)
delete(o.rdqi, seq)
o.rdqi.Delete(seq)
}
return seq
}
Expand All @@ -4087,11 +4088,7 @@ func (o *consumer) getNextToRedeliver() uint64 {
// FIXME(dlc) - This is O(n) but should be fast with small redeliver size.
// Lock should be held.
func (o *consumer) onRedeliverQueue(seq uint64) bool {
if o.rdqi == nil {
return false
}
_, ok := o.rdqi[seq]
return ok
return o.rdqi.Exists(seq)
}

// Remove a sequence from the redelivery queue.
Expand All @@ -4103,10 +4100,11 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool {
for i, rseq := range o.rdq {
if rseq == seq {
if len(o.rdq) == 1 {
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
} else {
o.rdq = append(o.rdq[:i], o.rdq[i+1:]...)
delete(o.rdqi, seq)
o.rdqi.Delete(seq)
}
return true
}
Expand Down Expand Up @@ -4218,7 +4216,8 @@ func (o *consumer) checkPending() {
} else {
// Make sure to stop timer and clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
}

Expand Down Expand Up @@ -4553,7 +4552,8 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
// We need to remove all those being queued for redelivery under o.rdq
if len(o.rdq) > 0 {
rdq := o.rdq
o.rdq, o.rdqi = nil, nil
o.rdq = nil
o.rdqi.Empty()
for _, sseq := range rdq {
if sseq >= o.sseq {
o.addToRedeliverQueue(sseq)
Expand Down