Skip to content

Commit

Permalink
[FIXED] Make sure to process extended purge operations correctly when…
Browse files Browse the repository at this point in the history
… being replayed. (#4212)

This is an extension to the excellent work by @MauriceVanVeen and his
original PR #4197 to fully resolve for all use cases.

Signed-off-by: Derek Collison <derek@nats.io>

Resolves #4196
  • Loading branch information
derekcollison committed Jun 4, 2023
2 parents eb09ddd + dee5324 commit e1f8064
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 5 deletions.
10 changes: 5 additions & 5 deletions server/jetstream_cluster.go
Expand Up @@ -2808,12 +2808,12 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
panic(err.Error())
}
// Ignore if we are recovering and we have already processed.
if isRecovering {
if mset.state().FirstSeq <= sp.LastSeq {
// Make sure all messages from the purge are gone.
mset.store.Compact(sp.LastSeq + 1)
if isRecovering && (sp.Request == nil || sp.Request.Sequence == 0) {
if sp.Request == nil {
sp.Request = &JSApiStreamPurgeRequest{Sequence: sp.LastSeq}
} else {
sp.Request.Sequence = sp.LastSeq
}
continue
}

s := js.server()
Expand Down
100 changes: 100 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -4280,3 +4280,103 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) {
// Each cluster hop that has the export/import mapping will add another T message copy.
checkSubsPending(t, tsub, num*4)
}

// https://github.com/nats-io/nats-server/pull/4197
func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "P3F", 3)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"TEST.>"},
Replicas: 3,
})
require_NoError(t, err)

sendStreamMsg(t, nc, "TEST.0", "OK")
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")

runTest := func(f func(js nats.JetStreamManager)) *nats.StreamInfo {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// install snapshot, then execute interior func, ensuring the purge will be recovered later
fsl := c.streamLeader(globalAccountName, "TEST")
fsl.JetStreamSnapshotStream(globalAccountName, "TEST")

f(js)
time.Sleep(250 * time.Millisecond)

fsl.Shutdown()
fsl.WaitForShutdown()
fsl = c.restartServer(fsl)
c.waitOnServerCurrent(fsl)

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

c.waitOnStreamLeader(globalAccountName, "TEST")
sl := c.streamLeader(globalAccountName, "TEST")

// keep stepping down so the stream leader matches the initial leader
// we need to check if it restored from the snapshot properly
for sl != fsl {
_, err := nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
sl = c.streamLeader(globalAccountName, "TEST")
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
return si
}
si := runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "TEST.0"})
require_NoError(t, err)
})
if si.State.Msgs != 2 {
t.Fatalf("Expected 2 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 2 || si.State.LastSeq != 3 {
t.Fatalf("Expected FirstSeq=2, LastSeq=3 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}

si = runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST")
require_NoError(t, err)
// Send 2 more messages.
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")
})
if si.State.Msgs != 2 {
t.Fatalf("Expected 2 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 4 || si.State.LastSeq != 5 {
t.Fatalf("Expected FirstSeq=4, LastSeq=5 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}

// Now test a keep
si = runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Keep: 1})
require_NoError(t, err)
// Send 3 more messages.
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")
sendStreamMsg(t, nc, "TEST.3", "OK")
})
if si.State.Msgs != 4 {
t.Fatalf("Expected 4 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 5 || si.State.LastSeq != 8 {
t.Fatalf("Expected FirstSeq=5, LastSeq=8 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}
}

0 comments on commit e1f8064

Please sign in to comment.