Skip to content

Commit

Permalink
Warn of consumer state update failures.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 21, 2023
1 parent d2179e0 commit 3c64d07
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
23 changes: 20 additions & 3 deletions server/consumer.go
Expand Up @@ -2100,6 +2100,7 @@ func (o *consumer) ackWait(next time.Duration) time.Duration {
}

// Due to bug in calculation of sequences on restoring redelivered let's do quick sanity check.
// Lock should be held.
func (o *consumer) checkRedelivered(slseq uint64) {
var lseq uint64
if mset := o.mset; mset != nil {
Expand All @@ -2114,7 +2115,13 @@ func (o *consumer) checkRedelivered(slseq uint64) {
}
}
if shouldUpdateState {
o.writeStoreStateUnlocked()
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.name(), name, err)
o.mu.Lock()
}
}
}

Expand Down Expand Up @@ -3882,7 +3889,13 @@ func (o *consumer) checkPending() {

// Update our state if needed.
if shouldUpdateState {
o.writeStoreStateUnlocked()
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.name(), name, err)
o.mu.Lock()
}
}
}

Expand Down Expand Up @@ -4169,9 +4182,13 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
}
}
}
// Grab some info in case of error below.
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.Unlock()

o.writeStoreState()
if err := o.writeStoreState(); err != nil && s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on write store state from purge: %v", acc, mset.name(), name, err)
}
}

func stopAndClearTimer(tp **time.Timer) {
Expand Down
9 changes: 8 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -4206,7 +4206,14 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
}
panic(err.Error())
}
o.store.Update(state)
if err = o.store.Update(state); err != nil {
o.mu.RLock()
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.RUnlock()
if s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err)
}
}
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
var ourID string
Expand Down
12 changes: 4 additions & 8 deletions server/jetstream_test.go
Expand Up @@ -19558,8 +19558,7 @@ func TestJetStreamPartialPurgeWithAckPending(t *testing.T) {

nmsgs := 100
for i := 0; i < nmsgs; i++ {
_, err := js.Publish("foo", []byte("OK"))
require_NoError(t, err)
sendStreamMsg(t, nc, "foo", "OK")
}
sub, err := js.PullSubscribe("foo", "dlc", nats.AckWait(time.Second))
require_NoError(t, err)
Expand All @@ -19583,8 +19582,7 @@ func TestJetStreamPartialPurgeWithAckPending(t *testing.T) {
require_True(t, ci.NumPending == 0)

for i := 0; i < nmsgs; i++ {
_, err := js.Publish("foo", []byte("OK"))
require_NoError(t, err)
sendStreamMsg(t, nc, "foo", "OK")
}

ci, err = js.ConsumerInfo("TEST", "dlc")
Expand Down Expand Up @@ -19666,8 +19664,7 @@ func TestJetStreamPurgeWithRedeliveredPending(t *testing.T) {

nmsgs := 100
for i := 0; i < nmsgs; i++ {
_, err := js.Publish("foo", []byte("OK"))
require_NoError(t, err)
sendStreamMsg(t, nc, "foo", "OK")
}
sub, err := js.PullSubscribe("foo", "dlc", nats.AckWait(time.Second))
require_NoError(t, err)
Expand Down Expand Up @@ -19715,8 +19712,7 @@ func TestJetStreamConsumerAckFloorWithExpired(t *testing.T) {

nmsgs := 100
for i := 0; i < nmsgs; i++ {
_, err := js.Publish("foo", []byte("OK"))
require_NoError(t, err)
sendStreamMsg(t, nc, "foo", "OK")
}
sub, err := js.PullSubscribe("foo", "dlc", nats.AckWait(time.Second))
require_NoError(t, err)
Expand Down

0 comments on commit 3c64d07

Please sign in to comment.