Skip to content

Commit

Permalink
[FIXED] Race condition in some leader failover scenarios leading to m…
Browse files Browse the repository at this point in the history
…essages being potentially sourced more than once.

- In some failure scenarios where the current leader of a stream sourcing from other stream(s) gets shutdown while publications are happening on the stream(s) being sourced leads to `setLeader(true)` being called on the new leader for the sourcing stream before all the messages having been sourced by the previous leader are completely processed such that when the new leader does it's reverse scan from the last message in it's view of the stream in order to know what sequence number to start the consumer for the stream being sourced from, such that the last message(s) sourced by the previous leader get sourced again, leading to some messages being sourced more than once.

The existing `TestNoRaceJetStreamSuperClusterSources` test would sidestep the issue by relying on the deduplication window in the sourcing stream. Without deduplication the test is a flapper.

This avoid the race condition by adding a small delay before scanning for the last message(s) having been sourced and starting the sources' consumer(s). Now the test (without using the deduplication window) never fails because more messages than expected have been received in the sourcing stream.

- Fix test TestJetStreamWorkQueueSourceRestart that expects the sourcing stream to get all of the expected messages right away by adding a small sleep before checking the number of messages pending on the consumer for that stream.

Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
  • Loading branch information
jnmoyne committed Sep 28, 2023
1 parent 27049a9 commit 71f9688
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 21 deletions.
2 changes: 2 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -16617,6 +16617,8 @@ func TestJetStreamWorkQueueSourceRestart(t *testing.T) {
sub, err := js.PullSubscribe("foo", "dur", nats.BindStream("TEST"))
require_NoError(t, err)

time.Sleep(100 * time.Millisecond)

ci, err := js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_True(t, ci.NumPending == uint64(sent))
Expand Down
3 changes: 1 addition & 2 deletions server/norace_test.go
Expand Up @@ -1874,7 +1874,6 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) {
msg := fmt.Sprintf("R-MSG-%d", i+1)
for _, sname := range []string{"foo", "bar", "baz"} {
m := nats.NewMsg(sname)
m.Header.Set(nats.MsgIdHdr, sname+"-"+msg)
m.Data = []byte(msg)
if _, err := js.PublishMsg(m); err != nil {
t.Errorf("Unexpected publish error: %v", err)
Expand All @@ -1891,7 +1890,7 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) {
sc.clusterForName("C3").waitOnStreamLeader("$G", "MS2")
<-doneCh

checkFor(t, 15*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 15*time.Second, time.Second, func() error {
si, err := js2.StreamInfo("MS2")
if err != nil {
return err
Expand Down
78 changes: 59 additions & 19 deletions server/stream.go
Expand Up @@ -246,8 +246,9 @@ type stream struct {
mirror *sourceInfo

// Sources
sources map[string]*sourceInfo
sourceRetries map[string]*time.Timer
sources map[string]*sourceInfo
sourceRetries map[string]*time.Timer
sourcesConsumerSetup *time.Timer

// Indicates we have direct consumers.
directs int
Expand Down Expand Up @@ -821,6 +822,11 @@ func (mset *stream) setLeader(isLeader bool) error {
return err
}
} else {
// cancel timer to create the source consumers if not fired yet
if mset.sourcesConsumerSetup != nil {
mset.sourcesConsumerSetup.Stop()
mset.sourcesConsumerSetup = nil
}
// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
Expand Down Expand Up @@ -2389,7 +2395,7 @@ func (mset *stream) scheduleSetupMirrorConsumerRetryAsap() {
}
// To make *sure* that the next request will not fail, add a bit of buffer
// and some randomness.
next += time.Duration(rand.Intn(50)) + 10*time.Millisecond
next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond
time.AfterFunc(next, func() {
mset.mu.Lock()
mset.setupMirrorConsumer()
Expand Down Expand Up @@ -2736,7 +2742,7 @@ func (mset *stream) scheduleSetSourceConsumerRetryAsap(si *sourceInfo, seq uint6
}
// To make *sure* that the next request will not fail, add a bit of buffer
// and some randomness.
next += time.Duration(rand.Intn(50)) + 10*time.Millisecond
next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond
mset.scheduleSetSourceConsumerRetry(si.iname, seq, next, startTime)
}

Expand Down Expand Up @@ -3295,16 +3301,9 @@ func (mset *stream) setStartingSequenceForSource(iName string, external *Externa
}
}

// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
// Lock should be held.
func (mset *stream) startingSequenceForSources() {
if len(mset.cfg.Sources) == 0 {
return
}
// Always reset here.
// lock should be held.
// Resets the SourceInfo for all the sources
func (mset *stream) resetSourceInfo() {
mset.sources = make(map[string]*sourceInfo)

for _, ssi := range mset.cfg.Sources {
Expand All @@ -3331,6 +3330,20 @@ func (mset *stream) startingSequenceForSources() {
}
mset.sources[ssi.iname] = si
}
}

// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
// Lock should be held.
func (mset *stream) startingSequenceForSources() {
if len(mset.cfg.Sources) == 0 {
return
}

// Always reset here.
mset.resetSourceInfo()

var state StreamState
mset.store.FastState(&state)
Expand Down Expand Up @@ -3414,6 +3427,11 @@ func (mset *stream) setupSourceConsumers() error {
}
}

// If we are no longer the leader, give up
if !mset.isLeader() {
return nil
}

mset.startingSequenceForSources()

// Setup our consumers at the proper starting position.
Expand All @@ -3439,13 +3457,35 @@ func (mset *stream) subscribeToStream() error {
}
// Check if we need to setup mirroring.
if mset.cfg.Mirror != nil {
if err := mset.setupMirrorConsumer(); err != nil {
return err
// setup the initial mirror sourceInfo
mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name}
sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms))
trs := make([]*subjectTransform, len(mset.cfg.Mirror.SubjectTransforms))

for i, tr := range mset.cfg.Mirror.SubjectTransforms {
// will not fail as already checked before that the transform will work
subjectTransform, err := NewSubjectTransform(tr.Source, tr.Destination)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}

sfs[i] = tr.Source
trs[i] = subjectTransform
}
mset.mirror.sfs = sfs
mset.mirror.trs = trs
// delay the actual mirror consumer creation for after a delay
mset.scheduleSetupMirrorConsumerRetryAsap()
} else if len(mset.cfg.Sources) > 0 {
if err := mset.setupSourceConsumers(); err != nil {
return err
}
// Setup the initial source infos for the sources
mset.resetSourceInfo()
// Delay the actual source consumer(s) creation(s) for after a delay

mset.sourcesConsumerSetup = time.AfterFunc(time.Duration(rand.Intn(int(10*time.Millisecond)))+10*time.Millisecond, func() {
mset.mu.Lock()
mset.setupSourceConsumers()
mset.mu.Unlock()
})
}
// Check for direct get access.
// We spin up followers for clustered streams in monitorStream().
Expand Down

0 comments on commit 71f9688

Please sign in to comment.