Skip to content

Commit

Permalink
Fix PurgeEx replay with sequence & keep succeeds (#4213)
Browse files Browse the repository at this point in the history
PR #4212 fixed the issue I
reported in #4196.

However, I believe there might be a bug when both `sequence` and `keep`
are set during recovery.
In the `PurgeEx` the following check is done (for both `filestore.go`
and `memstore.go`):
```go
	if sequence > 1 && keep > 0 {
		return 0, ErrPurgeArgMismatch
	}
```

The `TestJetStreamClusterPurgeExReplayAfterRestart` also triggers this
case, meaning that during the test this error is returned but it
succeeds because the purge was already performed. Is this intended
behaviour?

To elaborate a bit more, I believe the following happens:
- when running the purge normally it will properly run the `keep` (since
it's not combined with `sequence` yet)
- when replaying the purge though, the `sequence` is added to the
`keep`, which errors out in the above if

Which means that during normal operation all will be well, but purges
with `keep` will be ignored upon replaying.

I'm proposing to remove the `sequence > 1 && keep > 0` check and
subsequent error. Which, for reference, was introduced in
#3121.
Hoping this ensures that during recovery, purges that haven't executed
yet will still be executed.

An alternative approach, which wouldn't remove the error: not allow
combining `sequence` and `keep` normally and only allowing it during
recovery. Which would preserve the current behaviour, and correctly
apply `sequence+keep` during recovery still. However, not sure if it's
possible to know if we're in "recovery mode" from within `PurgeEx`.

Resolves #4196
  • Loading branch information
derekcollison committed Jun 4, 2023
2 parents e1f8064 + 132567d commit 64e3bf8
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 14 deletions.
4 changes: 0 additions & 4 deletions server/filestore.go
Expand Up @@ -5108,10 +5108,6 @@ func compareFn(subject string) func(string, string) bool {
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if sequence > 1 && keep > 0 {
return 0, ErrPurgeArgMismatch
}

if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && (sequence == 0 || sequence == 1) {
return fs.Purge()
Expand Down
26 changes: 22 additions & 4 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -4367,16 +4367,34 @@ func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) {
si = runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Keep: 1})
require_NoError(t, err)
// Send 4 more messages.
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")
sendStreamMsg(t, nc, "TEST.3", "OK")
sendStreamMsg(t, nc, "TEST.1", "OK")
})
if si.State.Msgs != 5 {
t.Fatalf("Expected 5 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 5 || si.State.LastSeq != 9 {
t.Fatalf("Expected FirstSeq=5, LastSeq=9 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}

// Now test a keep on a subject
si = runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "TEST.1", Keep: 1})
require_NoError(t, err)
// Send 3 more messages.
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")
sendStreamMsg(t, nc, "TEST.3", "OK")
})
if si.State.Msgs != 4 {
t.Fatalf("Expected 4 msgs after restart, got %d", si.State.Msgs)
if si.State.Msgs != 7 {
t.Fatalf("Expected 7 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 5 || si.State.LastSeq != 8 {
t.Fatalf("Expected FirstSeq=5, LastSeq=8 after restart, got FirstSeq=%d, LastSeq=%d",
if si.State.FirstSeq != 5 || si.State.LastSeq != 12 {
t.Fatalf("Expected FirstSeq=5, LastSeq=12 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}
}
4 changes: 0 additions & 4 deletions server/memstore.go
Expand Up @@ -614,10 +614,6 @@ func (ms *memStore) expireMsgs() {
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if sequence > 1 && keep > 0 {
return 0, ErrPurgeArgMismatch
}

if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && (sequence == 0 || sequence == 1) {
return ms.Purge()
Expand Down
2 changes: 0 additions & 2 deletions server/store.go
Expand Up @@ -61,8 +61,6 @@ var (
ErrInvalidSequence = errors.New("invalid sequence")
// ErrSequenceMismatch is returned when storing a raw message and the expected sequence is wrong.
ErrSequenceMismatch = errors.New("expected sequence does not match store")
// ErrPurgeArgMismatch is returned when PurgeEx is called with sequence > 1 and keep > 0.
ErrPurgeArgMismatch = errors.New("sequence > 1 && keep > 0 not allowed")
)

// StoreMsg is the stored message format for messages that are retained by the Store layer.
Expand Down

0 comments on commit 64e3bf8

Please sign in to comment.