Skip to content

Commit

Permalink
Skip processing consumer assignments after JS has shutdown (#4625)
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs authored and neilalexander committed Feb 26, 2024
1 parent 7b68256 commit 30ca7a7
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
4 changes: 4 additions & 0 deletions server/consumer.go
Expand Up @@ -1422,6 +1422,10 @@ func (o *consumer) deleteNotActive() {
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
if js.shuttingDown {
js.mu.RUnlock()
return
}
nca := js.consumerAssignment(acc, stream, name)
js.mu.RUnlock()
// Make sure this is not a new consumer with the same name.
Expand Down
8 changes: 7 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -2704,6 +2704,11 @@ func (mset *stream) resetClusteredState(err error) bool {

if sa != nil {
js.mu.Lock()
if js.shuttingDown {
js.mu.Unlock()
return
}

s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
// Now wipe groups from assignments.
sa.Group.node = nil
Expand Down Expand Up @@ -3825,6 +3830,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
s, cc := js.srv, js.cluster
accName, stream, consumerName := ca.Client.serviceAccount(), ca.Stream, ca.Name
noMeta := cc == nil || cc.meta == nil
shuttingDown := js.shuttingDown
var ourID string
if !noMeta {
ourID = cc.meta.ID()
Expand All @@ -3835,7 +3841,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
}
js.mu.RUnlock()

if s == nil || noMeta {
if s == nil || noMeta || shuttingDown {
return
}

Expand Down

0 comments on commit 30ca7a7

Please sign in to comment.