From 6ca26dda9a26fa7d747f2155d56d0754c91db998 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 6 Jun 2022 13:43:08 +0200 Subject: [PATCH] Add support for enhanced stream purge in JetStream --- .gitignore | 5 +- js.go | 7 ++ jsm.go | 19 +++- kv.go | 2 +- nats.go | 1 + object.go | 6 +- test/js_test.go | 282 ++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 313 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index a9977fce5..ae4871f46 100644 --- a/.gitignore +++ b/.gitignore @@ -39,4 +39,7 @@ _testmain.go # bin # Goland -.idea \ No newline at end of file +.idea + +# VS Code +.vscode \ No newline at end of file diff --git a/js.go b/js.go index 773748aad..a99c89d66 100644 --- a/js.go +++ b/js.go @@ -228,6 +228,8 @@ type jsOpts struct { // enables protocol tracing ctrace ClientTrace shouldTrace bool + // purgeOpts contains optional stream purge options + purgeOpts *StreamPurgeRequest } const ( @@ -294,6 +296,11 @@ func Domain(domain string) JSOpt { } +func (s *StreamPurgeRequest) configureJSContext(js *jsOpts) error { + js.purgeOpts = s + return nil +} + // APIPrefix changes the default prefix used for the JetStream API. func APIPrefix(pre string) JSOpt { return jsOptFn(func(js *jsOpts) error { diff --git a/jsm.go b/jsm.go index 6bfff6732..9170f8304 100644 --- a/jsm.go +++ b/jsm.go @@ -925,8 +925,8 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error { return nil } -// streamPurgeRequest is optional request information to the purge API. -type streamPurgeRequest struct { +// StreamPurgeRequest is optional request information to the purge API. +type StreamPurgeRequest struct { // Purge up to but not including sequence. Sequence uint64 `json:"seq,omitempty"` // Subject to match against messages for the purge command. @@ -946,10 +946,18 @@ func (js *js) PurgeStream(stream string, opts ...JSOpt) error { if err := checkStreamName(stream); err != nil { return err } - return js.purgeStream(stream, nil) + var req *StreamPurgeRequest + var ok bool + for _, opt := range opts { + // For PurgeStream, only request body opt is relevant + if req, ok = opt.(*StreamPurgeRequest); ok { + break + } + } + return js.purgeStream(stream, req) } -func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt) error { +func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt) error { o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return err @@ -975,6 +983,9 @@ func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt) return err } if resp.Error != nil { + if resp.Error.Code == 400 { + return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body") + } return errors.New(resp.Error.Description) } return nil diff --git a/kv.go b/kv.go index d4171b55f..630945263 100644 --- a/kv.go +++ b/kv.go @@ -717,7 +717,7 @@ func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error { } var ( - pr streamPurgeRequest + pr StreamPurgeRequest b strings.Builder ) // Do actual purges here. diff --git a/nats.go b/nats.go index 90f301f15..d451bf529 100644 --- a/nats.go +++ b/nats.go @@ -167,6 +167,7 @@ var ( ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed") ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use") ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded") + ErrBadRequest = errors.New("nats: bad request") ) func init() { diff --git a/object.go b/object.go index bc68ca4bb..f2646d885 100644 --- a/object.go +++ b/object.go @@ -336,7 +336,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn return perr } - purgePartial := func() { obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) } + purgePartial := func() { obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) } // Create our own JS context to handle errors etc. js, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) })) @@ -439,7 +439,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn // Delete any original one. if einfo != nil && !einfo.Deleted { chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, einfo.NUID) - obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) + obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) } return info, nil @@ -600,7 +600,7 @@ func (obs *obs) Delete(name string) error { // Purge chunks for the object. chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) - return obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) + return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) } // AddLink will add a link to another object into this object store. diff --git a/test/js_test.go b/test/js_test.go index 0ea848b0f..9c6133176 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1491,6 +1491,217 @@ func TestJetStreamManagement(t *testing.T) { }) } +func TestPurgeStream(t *testing.T) { + testData := []nats.Msg{ + { + Subject: "foo.A", + Data: []byte("first on A"), + }, + { + Subject: "foo.C", + Data: []byte("first on C"), + }, + { + Subject: "foo.B", + Data: []byte("first on B"), + }, + { + Subject: "foo.C", + Data: []byte("second on C"), + }, + } + + tests := []struct { + name string + stream string + req *nats.StreamPurgeRequest + withError error + expected []nats.Msg + }{ + { + name: "purge all messages", + stream: "foo", + expected: []nats.Msg{}, + }, + { + name: "with filter subject", + stream: "foo", + req: &nats.StreamPurgeRequest{ + Subject: "foo.C", + }, + expected: []nats.Msg{ + { + Subject: "foo.A", + Data: []byte("first on A"), + }, + { + Subject: "foo.B", + Data: []byte("first on B"), + }, + }, + }, + { + name: "with sequence", + stream: "foo", + req: &nats.StreamPurgeRequest{ + Sequence: 3, + }, + expected: []nats.Msg{ + { + Subject: "foo.B", + Data: []byte("first on B"), + }, + { + Subject: "foo.C", + Data: []byte("second on C"), + }, + }, + }, + { + name: "with keep", + stream: "foo", + req: &nats.StreamPurgeRequest{ + Keep: 1, + }, + expected: []nats.Msg{ + { + Subject: "foo.C", + Data: []byte("second on C"), + }, + }, + }, + { + name: "with filter and sequence", + stream: "foo", + req: &nats.StreamPurgeRequest{ + Subject: "foo.C", + Sequence: 3, + }, + expected: []nats.Msg{ + { + Subject: "foo.A", + Data: []byte("first on A"), + }, + { + Subject: "foo.B", + Data: []byte("first on B"), + }, + { + Subject: "foo.C", + Data: []byte("second on C"), + }, + }, + }, + { + name: "with filter and keep", + stream: "foo", + req: &nats.StreamPurgeRequest{ + Subject: "foo.C", + Keep: 1, + }, + expected: []nats.Msg{ + { + Subject: "foo.A", + Data: []byte("first on A"), + }, + { + Subject: "foo.B", + Data: []byte("first on B"), + }, + { + Subject: "foo.C", + Data: []byte("second on C"), + }, + }, + }, + { + name: "empty stream name", + stream: "", + req: &nats.StreamPurgeRequest{ + Sequence: 3, + Keep: 1, + }, + withError: nats.ErrStreamNameRequired, + }, + { + name: "invalid stream name", + stream: "bad.stream.name", + req: &nats.StreamPurgeRequest{ + Sequence: 3, + Keep: 1, + }, + withError: nats.ErrBadRequest, + }, + { + name: "invalid request - both sequence and keep provided", + stream: "foo", + req: &nats.StreamPurgeRequest{ + Sequence: 3, + Keep: 1, + }, + withError: nats.ErrBadRequest, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo.A", "foo.B", "foo.C"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for _, msg := range testData { + if _, err := js.PublishMsg(&msg); err != nil { + t.Fatalf("Unexpected error during publish: %v", err) + } + } + + err = js.PurgeStream("foo", test.req) + if test.withError != nil { + if err == nil { + t.Fatal("Expected error, got nil") + } + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: '%s'; got '%s'", test.withError, err) + } + return + } + + streamInfo, err := js.StreamInfo("foo", test.req) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if streamInfo.State.Msgs != uint64(len(test.expected)) { + t.Fatalf("Unexpected message count: expected %d; got: %d", len(test.expected), streamInfo.State.Msgs) + } + sub, err := js.SubscribeSync("foo.*", nats.BindStream("foo")) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + for i := 0; i < int(streamInfo.State.Msgs); i++ { + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if msg.Subject != test.expected[i].Subject { + t.Fatalf("Unexpected message; subject is different than expected: want %s; got: %s", test.expected[i].Subject, msg.Subject) + } + if string(msg.Data) != string(test.expected[i].Data) { + t.Fatalf("Unexpected message; data is different than expected: want %s; got: %s", test.expected[i].Data, msg.Data) + } + } + }) + } +} + func TestJetStreamManagement_GetMsg(t *testing.T) { t.Run("1-node", func(t *testing.T) { withJSServer(t, testJetStreamManagement_GetMsg) @@ -3102,6 +3313,77 @@ func TestJetStreamSubscribe_AutoAck(t *testing.T) { } } +func TestJetStreamSubscribe_ManualAck(t *testing.T) { + tests := []struct { + name string + opt nats.SubOpt + expectedAck bool + }{ + { + name: "with ack explicit", + opt: nats.ManualAck(), + expectedAck: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + // Create the stream using our client API. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js.Publish("foo", []byte("hello")) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + acks := make(chan struct{}, 2) + nc.Subscribe("$JS.ACK.TEST.>", func(msg *nats.Msg) { + acks <- struct{}{} + }) + nc.Flush() + + go func() { + defer func() { + if r := recover(); r != nil { + fmt.Println("Recovered in f", r) + } + }() + _, err = js.Subscribe("foo", func(m *nats.Msg) { + panic("aaaaa") + }, nats.AckExplicit(), test.opt) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + }() + + <-ctx.Done() + + if test.expectedAck { + if len(acks) != 1 { + t.Fatalf("Expected to receive a single ack, got: %v", len(acks)) + } + return + } + if len(acks) != 0 { + t.Fatalf("Expected no acks, got: %v", len(acks)) + } + }) + } +} + func TestJetStreamSubscribe_AckDupInProgress(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s)