Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] R1 stream move would sometimes lose all msgs. #4413

Merged
merged 1 commit into from Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/consumer.go
Expand Up @@ -3288,7 +3288,7 @@ func (o *consumer) checkAckFloor() {
}
}
} else if numPending > 0 {
// here it shorter to walk pending.
// here it is shorter to walk pending.
// toTerm is seq, dseq, rcd for each entry.
toTerm := make([]uint64, 0, numPending*3)
o.mu.RLock()
Expand Down
15 changes: 1 addition & 14 deletions server/jetstream_cluster.go
Expand Up @@ -2161,18 +2161,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

startMigrationMonitoring := func() {
if mmt == nil {
mmt = time.NewTicker(10 * time.Millisecond)
mmtc = mmt.C
}
}

adjustMigrationMonitoring := func() {
const delay = 500 * time.Millisecond
if mmt == nil {
mmt = time.NewTicker(delay)
mmt = time.NewTicker(500 * time.Millisecond)
mmtc = mmt.C
} else {
mmt.Reset(delay)
}
}

Expand Down Expand Up @@ -2407,9 +2397,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
continue
}

// Adjust to our normal time delay.
adjustMigrationMonitoring()

// Make sure we have correct cluster information on the other peers.
ci := js.clusterInfo(rg)
mset.checkClusterInfo(ci)
Expand Down
63 changes: 63 additions & 0 deletions server/jetstream_super_cluster_test.go
Expand Up @@ -3953,3 +3953,66 @@ func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) {
return nil
})
}

func TestJetStreamSuperClusterMovingR1Stream(t *testing.T) {
// Make C2 have some latency.
gwm := gwProxyMap{
"C2": &gwProxy{
rtt: 10 * time.Millisecond,
up: 1 * 1024 * 1024 * 1024, // 1gbit
down: 1 * 1024 * 1024 * 1024, // 1gbit
},
}
sc := createJetStreamTaggedSuperClusterWithGWProxy(t, gwm)
defer sc.shutdown()

nc, js := jsClientConnect(t, sc.clusterForName("C1").randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
})
require_NoError(t, err)

toSend := 10_000
for i := 0; i < toSend; i++ {
_, err := js.PublishAsync("TEST", []byte("HELLO WORLD"))
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Have it move to GCP.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Placement: &nats.Placement{Tags: []string{"cloud:gcp"}},
})
require_NoError(t, err)

checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
sc.waitOnStreamLeader(globalAccountName, "TEST")
si, err := js.StreamInfo("TEST")
if err != nil {
return err
}
if si.Cluster.Name != "C2" {
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
}
if si.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("No leader yet")
} else if !strings.HasPrefix(si.Cluster.Leader, "C2") {
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
}
// Now we want to see that we shrink back to original.
if len(si.Cluster.Replicas) != 0 {
return fmt.Errorf("Expected 0 replicas, got %d", len(si.Cluster.Replicas))
}
if si.State.Msgs != uint64(toSend) {
return fmt.Errorf("Only see %d msgs", si.State.Msgs)
}
return nil
})
}