Skip to content

Commit

Permalink
[IMPROVED] Reset logic for streams (#4177)
Browse files Browse the repository at this point in the history
When we detect conditions to reset streams, make sure we properly clean
up old NRG nodes etc.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 17, 2023
2 parents b856bba + a8d7d38 commit 94457e2
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions server/jetstream_cluster.go
Expand Up @@ -456,6 +456,9 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
}
// Make sure to clear out the raft node if still present in the meta layer.
if rg := sa.Group; rg != nil && rg.node != nil {
if rg.node.State() != Closed {
rg.node.Stop()
}
rg.node = nil
}
js.mu.Unlock()
Expand Down Expand Up @@ -493,7 +496,7 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
// For R1 it will make sure the stream is present on this server.
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
js.mu.Lock()
cc := js.cluster
s, cc := js.srv, js.cluster
if cc == nil {
// Non-clustered mode
js.mu.Unlock()
Expand Down Expand Up @@ -523,7 +526,12 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
if !mset.isCatchingUp() {
return true
}
} else if node != nil && node != mset.raftNode() {
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
}

return false
}

Expand Down Expand Up @@ -7590,6 +7598,10 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
}
}

// Do not let this go on forever.
const maxRetries = 3
var numRetries int

RETRY:
// On retry, we need to release the semaphore we got. Call will be no-op
// if releaseSem boolean has not been set to true on successfully getting
Expand All @@ -7606,13 +7618,20 @@ RETRY:
sub = nil
}

// Block here if we have too many requests in flight.
<-s.syncOutSem
releaseSem = true
if !s.isRunning() {
return ErrServerNotRunning
}

numRetries++
if numRetries >= maxRetries {
// Force a hard reset here.
return errFirstSequenceMismatch
}

// Block here if we have too many requests in flight.
<-s.syncOutSem
releaseSem = true

// We may have been blocked for a bit, so the reset need to ensure that we
// consume the already fired timer.
if !notActive.Stop() {
Expand Down

0 comments on commit 94457e2

Please sign in to comment.