Skip to content

Commit

Permalink
allow purging up to the maxSequence
Browse files Browse the repository at this point in the history
  • Loading branch information
MauriceVanVeen committed May 29, 2023
1 parent 3db39be commit c40a6a3
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 12 deletions.
13 changes: 10 additions & 3 deletions server/filestore.go
Expand Up @@ -5107,7 +5107,7 @@ func compareFn(subject string) func(string, string) bool {

// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
func (fs *fileStore) PurgeEx(subject string, sequence, keep, maxSequence uint64) (purged uint64, err error) {
if sequence > 1 && keep > 0 {
return 0, ErrPurgeArgMismatch
}
Expand All @@ -5120,12 +5120,12 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
return fs.Compact(sequence)
} else if keep > 0 {
fs.mu.RLock()
msgs, lseq := fs.state.Msgs, fs.state.LastSeq
msgs, mseq := fs.state.Msgs, maxSequence
fs.mu.RUnlock()
if keep >= msgs {
return 0, nil
}
return fs.Compact(lseq - keep + 1)
return fs.Compact(mseq - keep + 1)
}
return 0, nil
}
Expand Down Expand Up @@ -5169,6 +5169,13 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}
if sequence > 1 && sequence <= l {
l = sequence - 1
} else if l > maxSequence {
l = maxSequence
// no blocks left to traverse if we exceed the max sequence
if f > l {
mb.mu.Unlock()
break
}
}

for seq := f; seq <= l; seq++ {
Expand Down
40 changes: 38 additions & 2 deletions server/filestore_test.go
Expand Up @@ -3560,7 +3560,7 @@ func TestFileStorePurgeExKeepOneBug(t *testing.T) {
t.Fatalf("Expected to find 2 `A` msgs, got %d", fss.Msgs)
}

n, err := fs.PurgeEx("A", 0, 1)
n, err := fs.PurgeEx("A", 0, 1, fs.lastSeq())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -4015,7 +4015,7 @@ func TestFileStorePurgeExWithSubject(t *testing.T) {
require_NoError(t, err)

// This should purge all.
p, err := fs.PurgeEx("foo.1", 1, 0)
p, err := fs.PurgeEx("foo.1", 1, 0, fs.lastSeq())
require_NoError(t, err)
require_True(t, int(p) == total)
require_True(t, int(p) == total)
Expand All @@ -4024,6 +4024,42 @@ func TestFileStorePurgeExWithSubject(t *testing.T) {
})
}

func TestFileStorePurgeExWithMaxSequence(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 1000

fs, err := newFileStore(
fcfg,
StreamConfig{Name: "TEST", Subjects: []string{"foo.>"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()

payload := make([]byte, 20)
total := 200
for i := 0; i < total; i++ {
_, _, err = fs.StoreMsg("foo.1", nil, payload)
require_NoError(t, err)
}
_, _, err = fs.StoreMsg("foo.2", nil, []byte("xxxxxx"))
require_NoError(t, err)

// This should purge only one, since we've specified the max sequence to be the first
p, err := fs.PurgeEx("foo.1", 0, 0, 1)
require_NoError(t, err)
require_True(t, int(p) == 1)
require_True(t, fs.State().Msgs == 200)
require_True(t, fs.State().FirstSeq == 2)

// This should purge the rest, since we've specified the max sequence equal to the last sequence
p, err = fs.PurgeEx("foo.1", 0, 0, fs.lastSeq())
require_NoError(t, err)
require_True(t, int(p) == 199)
require_True(t, fs.State().Msgs == 1)
require_True(t, fs.State().FirstSeq == 201)
})
}

// When the N.idx file is shorter than the previous write we could fail to recover the idx properly.
// For instance, with encryption and an expiring stream that has no messages, when a restart happens the decrypt will fail
// since their are extra bytes, and this could lead to a stream sequence reset to zero.
Expand Down
12 changes: 11 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -155,6 +155,7 @@ type consumerAssignment struct {
type streamPurge struct {
Client *ClientInfo `json:"client,omitempty"`
Stream string `json:"stream"`
LastSeq uint64 `json:"last_seq"`
Subject string `json:"subject"`
Reply string `json:"reply"`
Request *JSApiStreamPurgeRequest `json:"request,omitempty"`
Expand Down Expand Up @@ -2806,6 +2807,15 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
panic(err.Error())
}
// Ignore if we are recovering and we have already processed.
if isRecovering {
if sp.Request != nil {
mset.store.PurgeEx(sp.Request.Subject, sp.Request.Sequence, sp.Request.Keep, sp.LastSeq)
} else if mset.state().FirstSeq <= sp.LastSeq {
mset.store.Compact(sp.LastSeq + 1)
}
continue
}

s := js.server()
purged, err := mset.purge(sp.Request)
Expand Down Expand Up @@ -6117,7 +6127,7 @@ func (s *Server) jsClusteredStreamPurgeRequest(
}

if n := sa.Group.node; n != nil {
sp := &streamPurge{Stream: stream, Subject: subject, Reply: reply, Client: ci, Request: preq}
sp := &streamPurge{Stream: stream, LastSeq: mset.state().LastSeq, Subject: subject, Reply: reply, Client: ci, Request: preq}
n.Propose(encodeStreamPurge(sp))
js.mu.Unlock()
return
Expand Down
8 changes: 5 additions & 3 deletions server/memstore.go
Expand Up @@ -613,7 +613,7 @@ func (ms *memStore) expireMsgs() {

// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
func (ms *memStore) PurgeEx(subject string, sequence, keep, maxSequence uint64) (purged uint64, err error) {
if sequence > 1 && keep > 0 {
return 0, ErrPurgeArgMismatch
}
Expand All @@ -626,12 +626,12 @@ func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint6
return ms.Compact(sequence)
} else if keep > 0 {
ms.mu.RLock()
msgs, lseq := ms.state.Msgs, ms.state.LastSeq
msgs, mseq := ms.state.Msgs, maxSequence
ms.mu.RUnlock()
if keep >= msgs {
return 0, nil
}
return ms.Compact(lseq - keep + 1)
return ms.Compact(mseq - keep + 1)
}
return 0, nil

Expand All @@ -647,6 +647,8 @@ func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint6
last := ss.Last
if sequence > 1 {
last = sequence - 1
} else if last > maxSequence {
last = maxSequence
}
ms.mu.Lock()
for seq := ss.First; seq <= last; seq++ {
Expand Down
30 changes: 29 additions & 1 deletion server/memstore_test.go
Expand Up @@ -416,10 +416,38 @@ func TestMemStorePurgeExWithSubject(t *testing.T) {
}

// This should purge all.
ms.PurgeEx("foo", 1, 0)
ms.PurgeEx("foo", 1, 0, ms.State().LastSeq)
require_True(t, ms.State().Msgs == 0)
}

func TestMemStorePurgeExWithMaxSequence(t *testing.T) {
ms, err := newMemStore(&StreamConfig{Storage: MemoryStorage})
require_NoError(t, err)

payload := make([]byte, 20)
total := 200
for i := 0; i < total; i++ {
_, _, err = ms.StoreMsg("foo.1", nil, payload)
require_NoError(t, err)
}
_, _, err = ms.StoreMsg("foo.2", nil, []byte("xxxxxx"))
require_NoError(t, err)

// This should purge only one, since we've specified the max sequence to be the first
p, err := ms.PurgeEx("foo.1", 0, 0, 1)
require_NoError(t, err)
require_True(t, int(p) == 1)
require_True(t, ms.State().Msgs == 200)
require_True(t, ms.State().FirstSeq == 2)

// This should purge the rest, since we've specified the max sequence equal to the last sequence
p, err = ms.PurgeEx("foo.1", 0, 0, ms.State().LastSeq)
require_NoError(t, err)
require_True(t, int(p) == 199)
require_True(t, ms.State().Msgs == 1)
require_True(t, ms.State().FirstSeq == 201)
}

func TestMemStoreUpdateMaxMsgsPerSubject(t *testing.T) {
cfg := &StreamConfig{
Name: "TEST",
Expand Down
2 changes: 1 addition & 1 deletion server/store.go
Expand Up @@ -89,7 +89,7 @@ type StreamStore interface {
RemoveMsg(seq uint64) (bool, error)
EraseMsg(seq uint64) (bool, error)
Purge() (uint64, error)
PurgeEx(subject string, seq, keep uint64) (uint64, error)
PurgeEx(subject string, seq, keep, mseq uint64) (uint64, error)
Compact(seq uint64) (uint64, error)
Truncate(seq uint64) error
GetSeqFromTime(t time.Time) uint64
Expand Down
2 changes: 1 addition & 1 deletion server/stream.go
Expand Up @@ -1675,7 +1675,7 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
mset.mu.RUnlock()

if preq != nil {
purged, err = mset.store.PurgeEx(preq.Subject, preq.Sequence, preq.Keep)
purged, err = mset.store.PurgeEx(preq.Subject, preq.Sequence, preq.Keep, mset.lastSeq())
} else {
purged, err = mset.store.Purge()
}
Expand Down

5 comments on commit c40a6a3

@derekcollison
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably can not make a change to the public interface. But I think you are on the right track.

Do we have some tests that show this doing the wrong thing after a restart? I can take a shot at it maybe later this week.

@MauriceVanVeen
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, by just taking the TestJetStreamClusterPurgeExReplayAfterRestart test from the first commit on the previous code.
Or by taking the current HEAD and reverting jetstream_cluster.go back to have these lines (without my addition of calling PurgeEx):

// Ignore if we are recovering and we have already processed.
if isRecovering {
	if mset.state().FirstSeq <= sp.LastSeq {
		mset.store.Compact(sp.LastSeq + 1)
	}
	continue
}

Should have the test fail with the following message:

    jetstream_cluster_1_test.go:4968: Expected 1 msg after restart, got 0

The test has two subjects TEST.0 and TEST.1. We'll publish these first, then install a snapshot and then purge on just the TEST.0.
The stream should have just 1 message left, namely the TEST.1. But because of the .Compact(sp.LastSeq + 1) it will be empty.

@derekcollison
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sounds good, will take a closer look. Thanks for the work here.

@derekcollison
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we resolved this yes?

@MauriceVanVeen
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.