Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] High CPU and Memory usage on replicated mirrors with very high starting sequence. #4249

Merged
merged 1 commit into from Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 60 additions & 0 deletions server/norace_test.go
Expand Up @@ -8138,3 +8138,63 @@ func TestNoRaceCheckAckFloorWithVeryLargeFirstSeqAndNewConsumers(t *testing.T) {
t.Fatalf("Check ack floor taking too long!")
}
}

func TestNoRaceReplicatedMirrorWithLargeStartingSequenceOverLeafnode(t *testing.T) {
// Cluster B
tmpl := strings.Replace(jsClusterTempl, "store_dir:", "domain: B, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "B", _EMPTY_, 3, 22020, true)
defer c.shutdown()

// Cluster A
// Domain is "A'
lc := c.createLeafNodesWithStartPortAndDomain("A", 3, 22110, "A")
defer lc.shutdown()

lc.waitOnClusterReady()

// Create a stream on B (HUB/CLOUD) and set its starting sequence very high.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 1_000_000_000})
require_NoError(t, err)

// Send in a small amount of messages.
for i := 0; i < 1000; i++ {
sendStreamMsg(t, nc, "foo", "Hello")
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.FirstSeq == 1_000_000_000)

// Now try to create a replicated mirror on the leaf cluster.
lnc, ljs := jsClientConnect(t, lc.randomServer())
defer lnc.Close()

_, err = ljs.AddStream(&nats.StreamConfig{
Name: "TEST",
Mirror: &nats.StreamSource{
Name: "TEST",
Domain: "B",
},
})
require_NoError(t, err)

// Make sure we sync quickly.
checkFor(t, time.Second, 200*time.Millisecond, func() error {
si, err = ljs.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs == 1000 && si.State.FirstSeq == 1_000_000_000 {
return nil
}
return fmt.Errorf("Mirror state not correct: %+v", si.State)
})
}
19 changes: 16 additions & 3 deletions server/stream.go
Expand Up @@ -1671,7 +1671,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
mset.mu.RUnlock()
return 0, errors.New("sealed stream")
}
store := mset.store
store, mlseq := mset.store, mset.lseq
mset.mu.RUnlock()

if preq != nil {
Expand All @@ -1683,11 +1683,17 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
return purged, err
}

// Purge consumers.
// Grab our stream state.
var state StreamState
store.FastState(&state)
fseq, lseq := state.FirstSeq, state.LastSeq

// Check if our last has moved past what our original last sequence was, if so reset.
if lseq > mlseq {
mset.setLastSeq(lseq)
}

// Purge consumers.
// Check for filtered purge.
if preq != nil && preq.Subject != _EMPTY_ {
ss := store.FilteredState(state.FirstSeq, preq.Subject)
Expand Down Expand Up @@ -2399,7 +2405,14 @@ func (mset *stream) setupMirrorConsumer() error {

// Check if we need to skip messages.
if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream {
mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream)
// Check to see if delivered is past our last and we have no msgs. This will help the
// case when mirroring a stream that has a very high starting sequence number.
if state.Msgs == 0 && ccr.ConsumerInfo.Delivered.Stream > state.LastSeq {
mset.store.PurgeEx(_EMPTY_, ccr.ConsumerInfo.Delivered.Stream+1, 0)
mset.lseq = ccr.ConsumerInfo.Delivered.Stream
} else {
mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream)
}
}

// Capture consumer name.
Expand Down