Skip to content

Commit

Permalink
[IMPROVED] Consumer updates (#4049)
Browse files Browse the repository at this point in the history
Make sure to process consumer entries on recovery in case state was not
committed.
And sync other consumers when taking over as leader but no need to
process snapshots when we are in fact the leader.
Do not let the consumer redelivered count go down on re-applying state
on restarts.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 14, 2023
2 parents 3f8fc5c + cc77d66 commit be4999a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 34 deletions.
5 changes: 4 additions & 1 deletion server/filestore.go
Expand Up @@ -6748,7 +6748,10 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
if o.state.Redelivered == nil {
o.state.Redelivered = make(map[uint64]uint64)
}
o.state.Redelivered[sseq] = dc - 1
// Only update if greater then what we already have.
if o.state.Redelivered[sseq] < dc {
o.state.Redelivered[sseq] = dc - 1
}
}
} else {
// For AckNone just update delivered and ackfloor at the same time.
Expand Down
65 changes: 37 additions & 28 deletions server/jetstream_cluster.go
Expand Up @@ -4271,17 +4271,15 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
}
// Check our state if we are under an interest based stream.
o.checkStateForInterestStream()
} else if !recovering {
if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
ne, nb := n.Applied(ce.Index)
ce.ReturnToPool()
// If we have at least min entries to compact, go ahead and snapshot/compact.
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
doSnapshot(false)
}
} else {
s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name)
} else if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
ne, nb := n.Applied(ce.Index)
ce.ReturnToPool()
// If we have at least min entries to compact, go ahead and snapshot/compact.
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
doSnapshot(false)
}
} else {
s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name)
}
}
aq.recycle(&ces)
Expand All @@ -4290,6 +4288,16 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
js.setConsumerAssignmentRecovering(ca)
}

// Synchronize everyone to our state.
if isLeader && n != nil {
// Only send out if we have state.
if _, _, applied := n.Progress(); applied > 0 {
if snap, err := o.store.EncodedState(); err == nil {
n.SendSnapshot(snap)
}
}
}

// Process the change.
if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader {
doSnapshot(true)
Expand Down Expand Up @@ -4389,26 +4397,27 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLeader bool) error {
for _, e := range ce.Entries {
if e.Type == EntrySnapshot {
// No-op needed?
state, err := decodeConsumerState(e.Data)
if err != nil {
if mset, node := o.streamAndNode(); mset != nil && node != nil {
s := js.srv
s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]",
mset.account(), mset.name(), o, node.Group())
if !isLeader {
// No-op needed?
state, err := decodeConsumerState(e.Data)
if err != nil {
if mset, node := o.streamAndNode(); mset != nil && node != nil {
s := js.srv
s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]",
mset.account(), mset.name(), o, node.Group())
}
panic(err.Error())
}
panic(err.Error())
}

if err = o.store.Update(state); err != nil {
o.mu.RLock()
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.RUnlock()
if s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err)
if err = o.store.Update(state); err != nil {
o.mu.RLock()
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.RUnlock()
if s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err)
}
} else {
o.checkStateForInterestStream()
}
} else {
o.checkStateForInterestStream()
}

} else if e.Type == EntryRemovePeer {
Expand Down
3 changes: 0 additions & 3 deletions server/jetstream_cluster_2_test.go
Expand Up @@ -5784,9 +5784,6 @@ func TestJetStreamClusterConsumerDeliverNewMaxRedeliveriesAndServerRestart(t *te
t.Fatalf("Expected timeout, got msg=%+v err=%v", msg, err)
}

// Give a chance to things to be persisted
time.Sleep(300 * time.Millisecond)

// Check server restart
nc.Close()
c.stopAll()
Expand Down
4 changes: 2 additions & 2 deletions server/stream.go
Expand Up @@ -4485,7 +4485,7 @@ func (mset *stream) delete() error {
// Internal function to stop or delete the stream.
func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.mu.RLock()
js, jsa := mset.js, mset.jsa
js, jsa, name := mset.js, mset.jsa, mset.cfg.Name
mset.mu.RUnlock()

if jsa == nil {
Expand All @@ -4494,7 +4494,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {

// Remove from our account map first.
jsa.mu.Lock()
delete(jsa.streams, mset.cfg.Name)
delete(jsa.streams, name)
accName := jsa.account.Name
jsa.mu.Unlock()

Expand Down

0 comments on commit be4999a

Please sign in to comment.