Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Race condition in some leader failover scenarios leading to messages being potentially sourced more than once. #4592

Merged
merged 1 commit into from Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -16617,6 +16617,8 @@ func TestJetStreamWorkQueueSourceRestart(t *testing.T) {
sub, err := js.PullSubscribe("foo", "dur", nats.BindStream("TEST"))
require_NoError(t, err)

time.Sleep(100 * time.Millisecond)

ci, err := js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_True(t, ci.NumPending == uint64(sent))
Expand Down
3 changes: 1 addition & 2 deletions server/norace_test.go
Expand Up @@ -1874,7 +1874,6 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) {
msg := fmt.Sprintf("R-MSG-%d", i+1)
for _, sname := range []string{"foo", "bar", "baz"} {
m := nats.NewMsg(sname)
m.Header.Set(nats.MsgIdHdr, sname+"-"+msg)
m.Data = []byte(msg)
if _, err := js.PublishMsg(m); err != nil {
t.Errorf("Unexpected publish error: %v", err)
Expand All @@ -1891,7 +1890,7 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) {
sc.clusterForName("C3").waitOnStreamLeader("$G", "MS2")
<-doneCh

checkFor(t, 15*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 15*time.Second, time.Second, func() error {
si, err := js2.StreamInfo("MS2")
if err != nil {
return err
Expand Down
78 changes: 59 additions & 19 deletions server/stream.go
Expand Up @@ -246,8 +246,9 @@ type stream struct {
mirror *sourceInfo

// Sources
sources map[string]*sourceInfo
sourceRetries map[string]*time.Timer
sources map[string]*sourceInfo
sourceRetries map[string]*time.Timer
sourcesConsumerSetup *time.Timer

// Indicates we have direct consumers.
directs int
Expand Down Expand Up @@ -821,6 +822,11 @@ func (mset *stream) setLeader(isLeader bool) error {
return err
}
} else {
// cancel timer to create the source consumers if not fired yet
if mset.sourcesConsumerSetup != nil {
mset.sourcesConsumerSetup.Stop()
mset.sourcesConsumerSetup = nil
}
// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
Expand Down Expand Up @@ -2389,7 +2395,7 @@ func (mset *stream) scheduleSetupMirrorConsumerRetryAsap() {
}
// To make *sure* that the next request will not fail, add a bit of buffer
// and some randomness.
next += time.Duration(rand.Intn(50)) + 10*time.Millisecond
next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond
time.AfterFunc(next, func() {
mset.mu.Lock()
mset.setupMirrorConsumer()
Expand Down Expand Up @@ -2736,7 +2742,7 @@ func (mset *stream) scheduleSetSourceConsumerRetryAsap(si *sourceInfo, seq uint6
}
// To make *sure* that the next request will not fail, add a bit of buffer
// and some randomness.
next += time.Duration(rand.Intn(50)) + 10*time.Millisecond
next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond
mset.scheduleSetSourceConsumerRetry(si.iname, seq, next, startTime)
}

Expand Down Expand Up @@ -3295,16 +3301,9 @@ func (mset *stream) setStartingSequenceForSource(iName string, external *Externa
}
}

// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
// Lock should be held.
func (mset *stream) startingSequenceForSources() {
if len(mset.cfg.Sources) == 0 {
return
}
// Always reset here.
// lock should be held.
// Resets the SourceInfo for all the sources
func (mset *stream) resetSourceInfo() {
mset.sources = make(map[string]*sourceInfo)

for _, ssi := range mset.cfg.Sources {
Expand All @@ -3331,6 +3330,20 @@ func (mset *stream) startingSequenceForSources() {
}
mset.sources[ssi.iname] = si
}
}

// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
// Lock should be held.
func (mset *stream) startingSequenceForSources() {
if len(mset.cfg.Sources) == 0 {
return
}

// Always reset here.
mset.resetSourceInfo()

var state StreamState
mset.store.FastState(&state)
Expand Down Expand Up @@ -3414,6 +3427,11 @@ func (mset *stream) setupSourceConsumers() error {
}
}

// If we are no longer the leader, give up
if !mset.isLeader() {
return nil
}

mset.startingSequenceForSources()

// Setup our consumers at the proper starting position.
Expand All @@ -3439,13 +3457,35 @@ func (mset *stream) subscribeToStream() error {
}
// Check if we need to setup mirroring.
if mset.cfg.Mirror != nil {
if err := mset.setupMirrorConsumer(); err != nil {
return err
// setup the initial mirror sourceInfo
mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name}
sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms))
trs := make([]*subjectTransform, len(mset.cfg.Mirror.SubjectTransforms))

for i, tr := range mset.cfg.Mirror.SubjectTransforms {
// will not fail as already checked before that the transform will work
subjectTransform, err := NewSubjectTransform(tr.Source, tr.Destination)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}

sfs[i] = tr.Source
trs[i] = subjectTransform
}
mset.mirror.sfs = sfs
mset.mirror.trs = trs
// delay the actual mirror consumer creation for after a delay
mset.scheduleSetupMirrorConsumerRetryAsap()
} else if len(mset.cfg.Sources) > 0 {
if err := mset.setupSourceConsumers(); err != nil {
return err
}
// Setup the initial source infos for the sources
mset.resetSourceInfo()
// Delay the actual source consumer(s) creation(s) for after a delay

mset.sourcesConsumerSetup = time.AfterFunc(time.Duration(rand.Intn(int(10*time.Millisecond)))+10*time.Millisecond, func() {
mset.mu.Lock()
mset.setupSourceConsumers()
mset.mu.Unlock()
})
}
// Check for direct get access.
// We spin up followers for clustered streams in monitorStream().
Expand Down