Skip to content

Commit

Permalink
[IMPROVED] Restart consumer behavior during healthz() checks. (#4172)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 16, 2023
2 parents 734895a + a06e1c9 commit ac68a19
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions server/jetstream_cluster.go
Expand Up @@ -475,6 +475,10 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
js.mu.Lock()
ca := sa.consumers[cca.Name]
if ca != nil && ca.Group != nil {
// Make sure the node is stopped if still running.
if node := ca.Group.node; node != nil && node.State() != Closed {
node.Stop()
}
// Make sure node is wiped.
ca.Group.node = nil
}
Expand Down Expand Up @@ -543,6 +547,10 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
restartConsumer := func() {
js.mu.Lock()
if ca.Group != nil {
// Make sure the node is stopped if still running.
if node := ca.Group.node; node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
}
deleted := ca.deleted
Expand Down

0 comments on commit ac68a19

Please sign in to comment.