Skip to content

Commit

Permalink
[IMPROVED] Consumer cleanup monitoring and FIX for datarace (#4536)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 14, 2023
2 parents b79b180 + 22f40ea commit 56c5e4a
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions server/consumer.go
Expand Up @@ -1544,19 +1544,32 @@ func (o *consumer) deleteNotActive() {
// If we do forward a proposal to delete ourselves to the metacontroller leader.
if !isDirect && s.JetStreamIsClustered() {
js.mu.RLock()
var (
cca consumerAssignment
meta RaftNode
removeEntry []byte
)
ca, cc := js.consumerAssignment(acc, stream, name), js.cluster
js.mu.RUnlock()

if ca != nil && cc != nil {
cca := *ca
meta = cc.meta
cca = *ca
cca.Reply = _EMPTY_
meta, removeEntry := cc.meta, encodeDeleteConsumerAssignment(&cca)
removeEntry = encodeDeleteConsumerAssignment(&cca)
meta.ForwardProposal(removeEntry)
}
js.mu.RUnlock()

if ca != nil && cc != nil {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
ticker := time.NewTicker(10 * time.Second)
const (
startInterval = 5 * time.Second
maxInterval = 5 * time.Minute
)
jitter := time.Duration(rand.Int63n(int64(startInterval)))
interval := startInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
Expand All @@ -1566,9 +1579,14 @@ func (o *consumer) deleteNotActive() {
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
} else {
return
if interval < maxInterval {
interval *= 2
ticker.Reset(interval)
}
continue
}
// We saw that consumer has been removed, all done.
return
}
}()
}
Expand Down

0 comments on commit 56c5e4a

Please sign in to comment.