Skip to content

Commit

Permalink
Merge pull request #3892 from nats-io/consumer-fixes
Browse files Browse the repository at this point in the history
[FIXED] Consumer fixes and improvements on state management.
  • Loading branch information
derekcollison committed Feb 21, 2023
2 parents 53cf663 + 3c64d07 commit 18b5aca
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 37 deletions.
105 changes: 78 additions & 27 deletions server/consumer.go
Expand Up @@ -1750,14 +1750,13 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {
// Used to process a working update to delay redelivery.
func (o *consumer) progressUpdate(seq uint64) {
o.mu.Lock()
if len(o.pending) > 0 {
if p, ok := o.pending[seq]; ok {
p.Timestamp = time.Now().UnixNano()
// Update store system.
o.updateDelivered(p.Sequence, seq, 1, p.Timestamp)
}
defer o.mu.Unlock()

if p, ok := o.pending[seq]; ok {
p.Timestamp = time.Now().UnixNano()
// Update store system.
o.updateDelivered(p.Sequence, seq, 1, p.Timestamp)
}
o.mu.Unlock()
}

// Lock should be held.
Expand Down Expand Up @@ -1984,10 +1983,8 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
return
}
// If we are explicit ack make sure this is still on our pending list.
if len(o.pending) > 0 {
if _, ok := o.pending[sseq]; !ok {
return
}
if _, ok := o.pending[sseq]; !ok {
return
}

// Deliver an advisory
Expand Down Expand Up @@ -2103,6 +2100,7 @@ func (o *consumer) ackWait(next time.Duration) time.Duration {
}

// Due to bug in calculation of sequences on restoring redelivered let's do quick sanity check.
// Lock should be held.
func (o *consumer) checkRedelivered(slseq uint64) {
var lseq uint64
if mset := o.mset; mset != nil {
Expand All @@ -2117,7 +2115,13 @@ func (o *consumer) checkRedelivered(slseq uint64) {
}
}
if shouldUpdateState {
o.writeStoreStateUnlocked()
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.name(), name, err)
o.mu.Lock()
}
}
}

Expand Down Expand Up @@ -2156,15 +2160,19 @@ func (o *consumer) applyState(state *ConsumerState) {
o.rdc = state.Redelivered

// Setup tracking timer if we have restored pending.
if len(o.pending) > 0 && o.ptmr == nil {
if len(o.pending) > 0 {
// This is on startup or leader change. We want to check pending
// sooner in case there are inconsistencies etc. Pick between 500ms - 1.5s
delay := 500*time.Millisecond + time.Duration(rand.Int63n(1000))*time.Millisecond
// If normal is lower than this just use that.
if o.cfg.AckWait < delay {
delay = o.ackWait(0)
}
o.ptmr = time.AfterFunc(delay, o.checkPending)
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(delay)
}
}
}

Expand Down Expand Up @@ -3680,6 +3688,7 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
}
if p, ok := o.pending[sseq]; ok {
p.Timestamp = time.Now().UnixNano()
p.Sequence = dseq
} else {
o.pending[sseq] = &Pending{dseq, time.Now().UnixNano()}
}
Expand Down Expand Up @@ -3805,7 +3814,7 @@ func (o *consumer) checkPending() {
fseq := state.FirstSeq

// Since we can update timestamps, we have to review all pending.
// We may want to unlock here or warn if list is big.
// We will now bail if we see an ack pending in bound to us via o.awl.
var expired []uint64
check := len(o.pending) > 1024
for seq, p := range o.pending {
Expand All @@ -3814,11 +3823,18 @@ func (o *consumer) checkPending() {
return
}
// Check if these are no longer valid.
if seq < fseq {
if seq < fseq || seq <= o.asflr {
delete(o.pending, seq)
delete(o.rdc, seq)
o.removeFromRedeliverQueue(seq)
shouldUpdateState = true
// Check if we need to move ack floors.
if seq > o.asflr {
o.asflr = seq
}
if p.Sequence > o.adflr {
o.adflr = p.Sequence
}
continue
}
elapsed, deadline := now-p.Timestamp, ttl
Expand Down Expand Up @@ -3865,13 +3881,21 @@ func (o *consumer) checkPending() {
if len(o.pending) > 0 {
o.ptmr.Reset(o.ackWait(time.Duration(next)))
} else {
o.ptmr.Stop()
o.ptmr = nil
// Make sure to stop timer and clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
o.rdq, o.rdqi = nil, nil
o.pending = nil
}

// Update our state if needed.
if shouldUpdateState {
o.writeStoreStateUnlocked()
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.name(), name, err)
o.mu.Lock()
}
}
}

Expand Down Expand Up @@ -4124,13 +4148,29 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
if o.sseq < sseq {
o.sseq = sseq
}

if o.asflr < sseq {
o.asflr = sseq - 1
if o.dseq > 0 {
o.adflr = o.dseq - 1

// We need to remove those no longer relevant from pending.
for seq, p := range o.pending {
if seq <= o.asflr {
if p.Sequence > o.adflr {
o.adflr = p.Sequence
if o.adflr > o.dseq {
o.dseq = o.adflr
}
}
delete(o.pending, seq)
delete(o.rdc, seq)
// rdq handled below.
}
}
}
o.pending = nil
// This means we can reset everything at this point.
if len(o.pending) == 0 {
o.pending, o.rdc = nil, nil
}

// We need to remove all those being queued for redelivery under o.rdq
if len(o.rdq) > 0 {
Expand All @@ -4142,9 +4182,13 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
}
}
}
// Grab some info in case of error below.
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.Unlock()

o.writeStoreState()
if err := o.writeStoreState(); err != nil && s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on write store state from purge: %v", acc, mset.name(), name, err)
}
}

func stopAndClearTimer(tp **time.Timer) {
Expand Down Expand Up @@ -4238,6 +4282,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
if dflag {
ca = o.ca
}
sigSub := o.sigSub
o.mu.Unlock()

if c != nil {
Expand All @@ -4251,10 +4296,16 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
a.sl.clearNotification(delivery, qgroup, o.inch)
}

mset.mu.Lock()
mset.removeConsumer(o)
rp := mset.cfg.Retention
mset.mu.Unlock()
var rp RetentionPolicy
if mset != nil {
if sigSub != nil {
mset.removeConsumerAsLeader(o)
}
mset.mu.Lock()
mset.removeConsumer(o)
rp = mset.cfg.Retention
mset.mu.Unlock()
}

// We need to optionally remove all messages since we are interest based retention.
// We will do this consistently on all replicas. Note that if in clustered mode the
Expand Down
18 changes: 9 additions & 9 deletions server/filestore.go
Expand Up @@ -6223,8 +6223,9 @@ func (o *consumerFileStore) flushLoop(fch, qch chan struct{}) {
return
}
// TODO(dlc) - if we error should start failing upwards.
o.writeState(buf)
lastWrite = time.Now()
if err := o.writeState(buf); err == nil {
lastWrite = time.Now()
}
case <-qch:
return
}
Expand Down Expand Up @@ -6414,6 +6415,11 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
o.mu.Lock()
defer o.mu.Unlock()

// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer {
return nil
}

// Sanity checks.
if state.AckFloor.Consumer > state.Delivered.Consumer {
return fmt.Errorf("bad ack floor for consumer")
Expand All @@ -6429,8 +6435,6 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
pending = make(map[uint64]*Pending, len(state.Pending))
for seq, p := range state.Pending {
pending[seq] = &Pending{p.Sequence, p.Timestamp}
}
for seq := range pending {
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
}
Expand All @@ -6443,15 +6447,11 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
}
}

// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer {
return fmt.Errorf("old update ignored")
}

o.state.Delivered = state.Delivered
o.state.AckFloor = state.AckFloor
o.state.Pending = pending
o.state.Redelivered = redelivered

o.kickFlusher()

return nil
Expand Down
9 changes: 8 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -4206,7 +4206,14 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
}
panic(err.Error())
}
o.store.Update(state)
if err = o.store.Update(state); err != nil {
o.mu.RLock()
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.RUnlock()
if s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err)
}
}
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
var ourID string
Expand Down

0 comments on commit 18b5aca

Please sign in to comment.