Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PurgeEx replay with sequence & keep succeeds #4213

Merged
merged 1 commit into from Jun 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions server/filestore.go
Expand Up @@ -5108,10 +5108,6 @@ func compareFn(subject string) func(string, string) bool {
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if sequence > 1 && keep > 0 {
return 0, ErrPurgeArgMismatch
}

if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && (sequence == 0 || sequence == 1) {
return fs.Purge()
Expand Down
26 changes: 22 additions & 4 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -4367,16 +4367,34 @@ func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) {
si = runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Keep: 1})
require_NoError(t, err)
// Send 4 more messages.
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")
sendStreamMsg(t, nc, "TEST.3", "OK")
sendStreamMsg(t, nc, "TEST.1", "OK")
})
if si.State.Msgs != 5 {
t.Fatalf("Expected 5 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 5 || si.State.LastSeq != 9 {
t.Fatalf("Expected FirstSeq=5, LastSeq=9 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}

// Now test a keep on a subject
si = runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "TEST.1", Keep: 1})
require_NoError(t, err)
// Send 3 more messages.
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")
sendStreamMsg(t, nc, "TEST.3", "OK")
})
if si.State.Msgs != 4 {
t.Fatalf("Expected 4 msgs after restart, got %d", si.State.Msgs)
if si.State.Msgs != 7 {
t.Fatalf("Expected 7 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 5 || si.State.LastSeq != 8 {
t.Fatalf("Expected FirstSeq=5, LastSeq=8 after restart, got FirstSeq=%d, LastSeq=%d",
if si.State.FirstSeq != 5 || si.State.LastSeq != 12 {
t.Fatalf("Expected FirstSeq=5, LastSeq=12 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}
}
4 changes: 0 additions & 4 deletions server/memstore.go
Expand Up @@ -614,10 +614,6 @@ func (ms *memStore) expireMsgs() {
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if sequence > 1 && keep > 0 {
return 0, ErrPurgeArgMismatch
}

if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && (sequence == 0 || sequence == 1) {
return ms.Purge()
Expand Down
2 changes: 0 additions & 2 deletions server/store.go
Expand Up @@ -61,8 +61,6 @@ var (
ErrInvalidSequence = errors.New("invalid sequence")
// ErrSequenceMismatch is returned when storing a raw message and the expected sequence is wrong.
ErrSequenceMismatch = errors.New("expected sequence does not match store")
// ErrPurgeArgMismatch is returned when PurgeEx is called with sequence > 1 and keep > 0.
ErrPurgeArgMismatch = errors.New("sequence > 1 && keep > 0 not allowed")
)

// StoreMsg is the stored message format for messages that are retained by the Store layer.
Expand Down