Skip to content

Commit

Permalink
Warn of consumer state update failures.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 20, 2023
1 parent d2179e0 commit 83a68ba
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
23 changes: 20 additions & 3 deletions server/consumer.go
Expand Up @@ -2100,6 +2100,7 @@ func (o *consumer) ackWait(next time.Duration) time.Duration {
}

// Due to bug in calculation of sequences on restoring redelivered let's do quick sanity check.
// Lock should be held.
func (o *consumer) checkRedelivered(slseq uint64) {
var lseq uint64
if mset := o.mset; mset != nil {
Expand All @@ -2114,7 +2115,13 @@ func (o *consumer) checkRedelivered(slseq uint64) {
}
}
if shouldUpdateState {
o.writeStoreStateUnlocked()
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.name(), name, err)
o.mu.Lock()
}
}
}

Expand Down Expand Up @@ -3882,7 +3889,13 @@ func (o *consumer) checkPending() {

// Update our state if needed.
if shouldUpdateState {
o.writeStoreStateUnlocked()
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.name(), name, err)
o.mu.Lock()
}
}
}

Expand Down Expand Up @@ -4169,9 +4182,13 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
}
}
}
// Grab some info in case of error below.
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.Unlock()

o.writeStoreState()
if err := o.writeStoreState(); err != nil && s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on write store state from purge: %v", acc, mset.name(), name, err)
}
}

func stopAndClearTimer(tp **time.Timer) {
Expand Down
9 changes: 8 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -4206,7 +4206,14 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
}
panic(err.Error())
}
o.store.Update(state)
if err = o.store.Update(state); err != nil {
o.mu.RLock()
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.RUnlock()
if s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err)
}
}
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
var ourID string
Expand Down

0 comments on commit 83a68ba

Please sign in to comment.