Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Consumer updates #4049

Merged
merged 3 commits into from Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion server/filestore.go
Expand Up @@ -6687,7 +6687,10 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
if o.state.Redelivered == nil {
o.state.Redelivered = make(map[uint64]uint64)
}
o.state.Redelivered[sseq] = dc - 1
// Only update if greater then what we already have.
if o.state.Redelivered[sseq] < dc {
o.state.Redelivered[sseq] = dc - 1
}
}
} else {
// For AckNone just update delivered and ackfloor at the same time.
Expand Down
65 changes: 37 additions & 28 deletions server/jetstream_cluster.go
Expand Up @@ -4271,17 +4271,15 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
}
// Check our state if we are under an interest based stream.
o.checkStateForInterestStream()
} else if !recovering {
if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
ne, nb := n.Applied(ce.Index)
ce.ReturnToPool()
// If we have at least min entries to compact, go ahead and snapshot/compact.
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
doSnapshot(false)
}
} else {
s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name)
} else if err := js.applyConsumerEntries(o, ce, isLeader); err == nil {
ne, nb := n.Applied(ce.Index)
ce.ReturnToPool()
// If we have at least min entries to compact, go ahead and snapshot/compact.
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
doSnapshot(false)
}
} else {
s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name)
}
}
aq.recycle(&ces)
Expand All @@ -4290,6 +4288,16 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
js.setConsumerAssignmentRecovering(ca)
}

// Synchronize everyone to our state.
if isLeader && n != nil {
// Only send out if we have state.
if _, _, applied := n.Progress(); applied > 0 {
if snap, err := o.store.EncodedState(); err == nil {
n.SendSnapshot(snap)
}
}
}

// Process the change.
if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader {
doSnapshot(true)
Expand Down Expand Up @@ -4389,26 +4397,27 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLeader bool) error {
for _, e := range ce.Entries {
if e.Type == EntrySnapshot {
// No-op needed?
state, err := decodeConsumerState(e.Data)
if err != nil {
if mset, node := o.streamAndNode(); mset != nil && node != nil {
s := js.srv
s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]",
mset.account(), mset.name(), o, node.Group())
if !isLeader {
// No-op needed?
state, err := decodeConsumerState(e.Data)
if err != nil {
if mset, node := o.streamAndNode(); mset != nil && node != nil {
s := js.srv
s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]",
mset.account(), mset.name(), o, node.Group())
}
panic(err.Error())
}
panic(err.Error())
}

if err = o.store.Update(state); err != nil {
o.mu.RLock()
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.RUnlock()
if s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err)
if err = o.store.Update(state); err != nil {
o.mu.RLock()
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
o.mu.RUnlock()
if s != nil && mset != nil {
s.Warnf("Consumer '%s > %s > %s' error on store update from snapshot entry: %v", acc, mset.name(), name, err)
}
} else {
o.checkStateForInterestStream()
}
} else {
o.checkStateForInterestStream()
}

} else if e.Type == EntryRemovePeer {
Expand Down
3 changes: 0 additions & 3 deletions server/jetstream_cluster_2_test.go
Expand Up @@ -5784,9 +5784,6 @@ func TestJetStreamClusterConsumerDeliverNewMaxRedeliveriesAndServerRestart(t *te
t.Fatalf("Expected timeout, got msg=%+v err=%v", msg, err)
}

// Give a chance to things to be persisted
time.Sleep(300 * time.Millisecond)

// Check server restart
nc.Close()
c.stopAll()
Expand Down
4 changes: 2 additions & 2 deletions server/stream.go
Expand Up @@ -4485,7 +4485,7 @@ func (mset *stream) delete() error {
// Internal function to stop or delete the stream.
func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.mu.RLock()
js, jsa := mset.js, mset.jsa
js, jsa, name := mset.js, mset.jsa, mset.cfg.Name
mset.mu.RUnlock()

if jsa == nil {
Expand All @@ -4494,7 +4494,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {

// Remove from our account map first.
jsa.mu.Lock()
delete(jsa.streams, mset.cfg.Name)
delete(jsa.streams, name)
accName := jsa.account.Name
jsa.mu.Unlock()

Expand Down