Skip to content

Commit

Permalink
[IMPROVED] High CPU and Memory usage on replicated mirrors with very …
Browse files Browse the repository at this point in the history
…high starting sequence. (#4249)

When creating replicated mirrors where the source stream had a very
large starting sequence number, the server would use excessive CPU and
Memory.

This is due to the mirroring functionality trying to skip messages when
it detects a gap. In a replicated stream this puts excessive stress on
the raft system.

This step is not needed at all if the mirror stream has no messages, we
can simply jump ahead.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 16, 2023
2 parents 367d857 + 087a28a commit 4a1b281
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 3 deletions.
60 changes: 60 additions & 0 deletions server/norace_test.go
Expand Up @@ -8138,3 +8138,63 @@ func TestNoRaceCheckAckFloorWithVeryLargeFirstSeqAndNewConsumers(t *testing.T) {
t.Fatalf("Check ack floor taking too long!")
}
}

func TestNoRaceReplicatedMirrorWithLargeStartingSequenceOverLeafnode(t *testing.T) {
// Cluster B
tmpl := strings.Replace(jsClusterTempl, "store_dir:", "domain: B, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "B", _EMPTY_, 3, 22020, true)
defer c.shutdown()

// Cluster A
// Domain is "A'
lc := c.createLeafNodesWithStartPortAndDomain("A", 3, 22110, "A")
defer lc.shutdown()

lc.waitOnClusterReady()

// Create a stream on B (HUB/CLOUD) and set its starting sequence very high.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 1_000_000_000})
require_NoError(t, err)

// Send in a small amount of messages.
for i := 0; i < 1000; i++ {
sendStreamMsg(t, nc, "foo", "Hello")
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.FirstSeq == 1_000_000_000)

// Now try to create a replicated mirror on the leaf cluster.
lnc, ljs := jsClientConnect(t, lc.randomServer())
defer lnc.Close()

_, err = ljs.AddStream(&nats.StreamConfig{
Name: "TEST",
Mirror: &nats.StreamSource{
Name: "TEST",
Domain: "B",
},
})
require_NoError(t, err)

// Make sure we sync quickly.
checkFor(t, time.Second, 200*time.Millisecond, func() error {
si, err = ljs.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs == 1000 && si.State.FirstSeq == 1_000_000_000 {
return nil
}
return fmt.Errorf("Mirror state not correct: %+v", si.State)
})
}
19 changes: 16 additions & 3 deletions server/stream.go
Expand Up @@ -1671,7 +1671,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
mset.mu.RUnlock()
return 0, errors.New("sealed stream")
}
store := mset.store
store, mlseq := mset.store, mset.lseq
mset.mu.RUnlock()

if preq != nil {
Expand All @@ -1683,11 +1683,17 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
return purged, err
}

// Purge consumers.
// Grab our stream state.
var state StreamState
store.FastState(&state)
fseq, lseq := state.FirstSeq, state.LastSeq

// Check if our last has moved past what our original last sequence was, if so reset.
if lseq > mlseq {
mset.setLastSeq(lseq)
}

// Purge consumers.
// Check for filtered purge.
if preq != nil && preq.Subject != _EMPTY_ {
ss := store.FilteredState(state.FirstSeq, preq.Subject)
Expand Down Expand Up @@ -2399,7 +2405,14 @@ func (mset *stream) setupMirrorConsumer() error {

// Check if we need to skip messages.
if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream {
mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream)
// Check to see if delivered is past our last and we have no msgs. This will help the
// case when mirroring a stream that has a very high starting sequence number.
if state.Msgs == 0 && ccr.ConsumerInfo.Delivered.Stream > state.LastSeq {
mset.store.PurgeEx(_EMPTY_, ccr.ConsumerInfo.Delivered.Stream+1, 0)
mset.lseq = ccr.ConsumerInfo.Delivered.Stream
} else {
mset.skipMsgs(state.LastSeq+1, ccr.ConsumerInfo.Delivered.Stream)
}
}

// Capture consumer name.
Expand Down

0 comments on commit 4a1b281

Please sign in to comment.