Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Consumer fixes and improvements on state management. #3892

Merged
merged 4 commits into from Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 78 additions & 27 deletions server/consumer.go
Expand Up @@ -1750,14 +1750,13 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {
// Used to process a working update to delay redelivery.
func (o *consumer) progressUpdate(seq uint64) {
o.mu.Lock()
if len(o.pending) > 0 {
if p, ok := o.pending[seq]; ok {
p.Timestamp = time.Now().UnixNano()
// Update store system.
o.updateDelivered(p.Sequence, seq, 1, p.Timestamp)
}
defer o.mu.Unlock()

if p, ok := o.pending[seq]; ok {
p.Timestamp = time.Now().UnixNano()
// Update store system.
o.updateDelivered(p.Sequence, seq, 1, p.Timestamp)
}
o.mu.Unlock()
}

// Lock should be held.
Expand Down Expand Up @@ -1984,10 +1983,8 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
return
}
// If we are explicit ack make sure this is still on our pending list.
if len(o.pending) > 0 {
if _, ok := o.pending[sseq]; !ok {
return
}
if _, ok := o.pending[sseq]; !ok {
return
}

// Deliver an advisory
Expand Down Expand Up @@ -2103,6 +2100,7 @@ func (o *consumer) ackWait(next time.Duration) time.Duration {
}

// Due to bug in calculation of sequences on restoring redelivered let's do quick sanity check.
// Lock should be held.
func (o *consumer) checkRedelivered(slseq uint64) {
var lseq uint64
if mset := o.mset; mset != nil {
Expand All @@ -2117,7 +2115,13 @@ func (o *consumer) checkRedelivered(slseq uint64) {
}
}
if shouldUpdateState {
o.writeStoreStateUnlocked()
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.name(), name, err)
o.mu.Lock()
}
}
}

Expand Down Expand Up @@ -2156,15 +2160,19 @@ func (o *consumer) applyState(state *ConsumerState) {
o.rdc = state.Redelivered

// Setup tracking timer if we have restored pending.
if len(o.pending) > 0 && o.ptmr == nil {
if len(o.pending) > 0 {
// This is on startup or leader change. We want to check pending
// sooner in case there are inconsistencies etc. Pick between 500ms - 1.5s
delay := 500*time.Millisecond + time.Duration(rand.Int63n(1000))*time.Millisecond
// If normal is lower than this just use that.
if o.cfg.AckWait < delay {
delay = o.ackWait(0)
}
o.ptmr = time.AfterFunc(delay, o.checkPending)
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(delay)
}
}
}

Expand Down Expand Up @@ -3680,6 +3688,7 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
}
if p, ok := o.pending[sseq]; ok {
p.Timestamp = time.Now().UnixNano()
p.Sequence = dseq
} else {
o.pending[sseq] = &Pending{dseq, time.Now().UnixNano()}
}
Expand Down Expand Up @@ -3805,7 +3814,7 @@ func (o *consumer) checkPending() {
fseq := state.FirstSeq

// Since we can update timestamps, we have to review all pending.
// We may want to unlock here or warn if list is big.
// We will now bail if we see an ack pending in bound to us via o.awl.
var expired []uint64
check := len(o.pending) > 1024
for seq, p := range o.pending {
Expand All @@ -3814,11 +3823,18 @@ func (o *consumer) checkPending() {
return
}
// Check if these are no longer valid.
if seq < fseq {
if seq < fseq || seq <= o.asflr {
delete(o.pending, seq)
delete(o.rdc, seq)
o.removeFromRedeliverQueue(seq)
shouldUpdateState = true
// Check if we need to move ack floors.
if seq > o.asflr {
o.asflr = seq
}
if p.Sequence > o.adflr {
o.adflr = p.Sequence
}
continue
}
elapsed, deadline := now-p.Timestamp, ttl
Expand Down Expand Up @@ -3865,13 +3881,21 @@ func (o *consumer) checkPending() {
if len(o.pending) > 0 {
o.ptmr.Reset(o.ackWait(time.Duration(next)))
} else {
o.ptmr.Stop()
o.ptmr = nil
// Make sure to stop timer and clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
o.rdq, o.rdqi = nil, nil
o.pending = nil
}

// Update our state if needed.
if shouldUpdateState {
o.writeStoreStateUnlocked()
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.name(), name, err)
o.mu.Lock()
}
}
}

Expand Down Expand Up @@ -4124,13 +4148,29 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
if o.sseq < sseq {
o.sseq = sseq
}

if o.asflr < sseq {
o.asflr = sseq - 1
if o.dseq > 0 {
o.adflr = o.dseq - 1

// We need to remove those no longer relevant from pending.
for seq, p := range o.pending {
if seq <= o.asflr {
if p.Sequence > o.adflr {
o.adflr = p.Sequence
if o.adflr > o.dseq {
o.dseq = o.adflr
}
}
delete(o.pending, seq)
delete(o.rdc, seq)
// rdq handled below.
}
}
}
o.pending = nil
// This means we can reset everything at this point.
if len(o.pending) == 0 {
o.pending, o.rdc = nil, nil
}

// We need to remove all those being queued for redelivery under o.rdq
if len(o.rdq) > 0 {
Expand All @@ -4142,9 +4182,13 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
}
}
}
// Grab some info in case of error below.
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.Unlock()

o.writeStoreState()
if err := o.writeStoreState(); err != nil && s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on write store state from purge: %v", acc, mset.name(), name, err)
}
}

func stopAndClearTimer(tp **time.Timer) {
Expand Down Expand Up @@ -4238,6 +4282,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
if dflag {
ca = o.ca
}
sigSub := o.sigSub
o.mu.Unlock()

if c != nil {
Expand All @@ -4251,10 +4296,16 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
a.sl.clearNotification(delivery, qgroup, o.inch)
}

mset.mu.Lock()
mset.removeConsumer(o)
rp := mset.cfg.Retention
mset.mu.Unlock()
var rp RetentionPolicy
if mset != nil {
if sigSub != nil {
mset.removeConsumerAsLeader(o)
}
mset.mu.Lock()
mset.removeConsumer(o)
rp = mset.cfg.Retention
mset.mu.Unlock()
}

// We need to optionally remove all messages since we are interest based retention.
// We will do this consistently on all replicas. Note that if in clustered mode the
Expand Down
18 changes: 9 additions & 9 deletions server/filestore.go
Expand Up @@ -6223,8 +6223,9 @@ func (o *consumerFileStore) flushLoop(fch, qch chan struct{}) {
return
}
// TODO(dlc) - if we error should start failing upwards.
o.writeState(buf)
lastWrite = time.Now()
if err := o.writeState(buf); err == nil {
lastWrite = time.Now()
}
case <-qch:
return
}
Expand Down Expand Up @@ -6414,6 +6415,11 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
o.mu.Lock()
defer o.mu.Unlock()

// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer {
return nil
}

// Sanity checks.
if state.AckFloor.Consumer > state.Delivered.Consumer {
return fmt.Errorf("bad ack floor for consumer")
Expand All @@ -6429,8 +6435,6 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
pending = make(map[uint64]*Pending, len(state.Pending))
for seq, p := range state.Pending {
pending[seq] = &Pending{p.Sequence, p.Timestamp}
}
for seq := range pending {
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
}
Expand All @@ -6443,15 +6447,11 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
}
}

// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer {
return fmt.Errorf("old update ignored")
}

o.state.Delivered = state.Delivered
o.state.AckFloor = state.AckFloor
o.state.Pending = pending
o.state.Redelivered = redelivered

o.kickFlusher()

return nil
Expand Down
9 changes: 8 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -4206,7 +4206,14 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
}
panic(err.Error())
}
o.store.Update(state)
if err = o.store.Update(state); err != nil {
o.mu.RLock()
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.RUnlock()
if s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err)
}
}
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
var ourID string
Expand Down