Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADDED: support for enhanced stream purge in JetStream #988

Merged
merged 1 commit into from Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Expand Up @@ -39,4 +39,7 @@ _testmain.go
# bin

# Goland
.idea
.idea

# VS Code
.vscode
7 changes: 7 additions & 0 deletions js.go
Expand Up @@ -228,6 +228,8 @@ type jsOpts struct {
// enables protocol tracing
ctrace ClientTrace
shouldTrace bool
// purgeOpts contains optional stream purge options
purgeOpts *StreamPurgeRequest
}

const (
Expand Down Expand Up @@ -294,6 +296,11 @@ func Domain(domain string) JSOpt {

}

func (s *StreamPurgeRequest) configureJSContext(js *jsOpts) error {
js.purgeOpts = s
return nil
}

// APIPrefix changes the default prefix used for the JetStream API.
func APIPrefix(pre string) JSOpt {
return jsOptFn(func(js *jsOpts) error {
Expand Down
19 changes: 15 additions & 4 deletions jsm.go
Expand Up @@ -925,8 +925,8 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
return nil
}

// streamPurgeRequest is optional request information to the purge API.
type streamPurgeRequest struct {
// StreamPurgeRequest is optional request information to the purge API.
type StreamPurgeRequest struct {
// Purge up to but not including sequence.
Sequence uint64 `json:"seq,omitempty"`
// Subject to match against messages for the purge command.
Expand All @@ -946,10 +946,18 @@ func (js *js) PurgeStream(stream string, opts ...JSOpt) error {
if err := checkStreamName(stream); err != nil {
return err
}
return js.purgeStream(stream, nil)
var req *StreamPurgeRequest
var ok bool
for _, opt := range opts {
// For PurgeStream, only request body opt is relevant
if req, ok = opt.(*StreamPurgeRequest); ok {
break
}
}
return js.purgeStream(stream, req)
}

func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt) error {
func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return err
Expand All @@ -975,6 +983,9 @@ func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt)
return err
}
if resp.Error != nil {
if resp.Error.Code == 400 {
return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body")
}
return errors.New(resp.Error.Description)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion kv.go
Expand Up @@ -717,7 +717,7 @@ func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error {
}

var (
pr streamPurgeRequest
pr StreamPurgeRequest
b strings.Builder
)
// Do actual purges here.
Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -167,6 +167,7 @@ var (
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrBadRequest = errors.New("nats: bad request")
)

func init() {
Expand Down
6 changes: 3 additions & 3 deletions object.go
Expand Up @@ -336,7 +336,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return perr
}

purgePartial := func() { obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) }
purgePartial := func() { obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) }

// Create our own JS context to handle errors etc.
js, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) }))
Expand Down Expand Up @@ -439,7 +439,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
// Delete any original one.
if einfo != nil && !einfo.Deleted {
chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, einfo.NUID)
obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj})
obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

return info, nil
Expand Down Expand Up @@ -600,7 +600,7 @@ func (obs *obs) Delete(name string) error {

// Purge chunks for the object.
chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
return obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj})
return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

// AddLink will add a link to another object into this object store.
Expand Down
211 changes: 211 additions & 0 deletions test/js_test.go
Expand Up @@ -1491,6 +1491,217 @@ func TestJetStreamManagement(t *testing.T) {
})
}

func TestPurgeStream(t *testing.T) {
testData := []nats.Msg{
{
Subject: "foo.A",
Data: []byte("first on A"),
},
{
Subject: "foo.C",
Data: []byte("first on C"),
},
{
Subject: "foo.B",
Data: []byte("first on B"),
},
{
Subject: "foo.C",
Data: []byte("second on C"),
},
}

tests := []struct {
name string
stream string
req *nats.StreamPurgeRequest
withError error
expected []nats.Msg
}{
{
name: "purge all messages",
stream: "foo",
expected: []nats.Msg{},
},
{
name: "with filter subject",
stream: "foo",
req: &nats.StreamPurgeRequest{
Subject: "foo.C",
},
expected: []nats.Msg{
{
Subject: "foo.A",
Data: []byte("first on A"),
},
{
Subject: "foo.B",
Data: []byte("first on B"),
},
},
},
{
name: "with sequence",
stream: "foo",
req: &nats.StreamPurgeRequest{
Sequence: 3,
},
expected: []nats.Msg{
{
Subject: "foo.B",
Data: []byte("first on B"),
},
{
Subject: "foo.C",
Data: []byte("second on C"),
},
},
},
{
name: "with keep",
stream: "foo",
req: &nats.StreamPurgeRequest{
Keep: 1,
},
expected: []nats.Msg{
{
Subject: "foo.C",
Data: []byte("second on C"),
},
},
},
{
name: "with filter and sequence",
stream: "foo",
req: &nats.StreamPurgeRequest{
Subject: "foo.C",
Sequence: 3,
},
expected: []nats.Msg{
{
Subject: "foo.A",
Data: []byte("first on A"),
},
{
Subject: "foo.B",
Data: []byte("first on B"),
},
{
Subject: "foo.C",
Data: []byte("second on C"),
},
},
},
{
name: "with filter and keep",
stream: "foo",
req: &nats.StreamPurgeRequest{
Subject: "foo.C",
Keep: 1,
},
expected: []nats.Msg{
{
Subject: "foo.A",
Data: []byte("first on A"),
},
{
Subject: "foo.B",
Data: []byte("first on B"),
},
{
Subject: "foo.C",
Data: []byte("second on C"),
},
},
},
{
name: "empty stream name",
stream: "",
req: &nats.StreamPurgeRequest{
Subject: "foo.C",
Keep: 1,
},
withError: nats.ErrStreamNameRequired,
},
{
name: "invalid stream name",
stream: "bad.stream.name",
req: &nats.StreamPurgeRequest{
Subject: "foo.C",
Keep: 1,
},
withError: nats.ErrInvalidStreamName,
},
{
name: "invalid request - both sequence and keep provided",
stream: "foo",
req: &nats.StreamPurgeRequest{
Sequence: 3,
Keep: 1,
},
withError: nats.ErrBadRequest,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.A", "foo.B", "foo.C"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for _, msg := range testData {
if _, err := js.PublishMsg(&msg); err != nil {
t.Fatalf("Unexpected error during publish: %v", err)
}
}

err = js.PurgeStream(test.stream, test.req)
if test.withError != nil {
if err == nil {
t.Fatal("Expected error, got nil")
}
if !errors.Is(err, test.withError) {
t.Fatalf("Expected error: '%s'; got '%s'", test.withError, err)
}
return
}

streamInfo, err := js.StreamInfo("foo", test.req)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if streamInfo.State.Msgs != uint64(len(test.expected)) {
t.Fatalf("Unexpected message count: expected %d; got: %d", len(test.expected), streamInfo.State.Msgs)
}
sub, err := js.SubscribeSync("foo.*", nats.BindStream("foo"))
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
for i := 0; i < int(streamInfo.State.Msgs); i++ {
msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if msg.Subject != test.expected[i].Subject {
t.Fatalf("Unexpected message; subject is different than expected: want %s; got: %s", test.expected[i].Subject, msg.Subject)
}
if string(msg.Data) != string(test.expected[i].Data) {
t.Fatalf("Unexpected message; data is different than expected: want %s; got: %s", test.expected[i].Data, msg.Data)
}
}
})
}
}

func TestJetStreamManagement_GetMsg(t *testing.T) {
t.Run("1-node", func(t *testing.T) {
withJSServer(t, testJetStreamManagement_GetMsg)
Expand Down