Skip to content

Commit

Permalink
Merge pull request #988 from nats-io/purge-stream-optional-params
Browse files Browse the repository at this point in the history
ADDED: support for enhanced stream purge in JetStream
  • Loading branch information
piotrpio committed Jun 6, 2022
2 parents ce0cf21 + 39c636d commit 5d37129
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 9 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Expand Up @@ -39,4 +39,7 @@ _testmain.go
# bin

# Goland
.idea
.idea

# VS Code
.vscode
7 changes: 7 additions & 0 deletions js.go
Expand Up @@ -228,6 +228,8 @@ type jsOpts struct {
// enables protocol tracing
ctrace ClientTrace
shouldTrace bool
// purgeOpts contains optional stream purge options
purgeOpts *StreamPurgeRequest
}

const (
Expand Down Expand Up @@ -294,6 +296,11 @@ func Domain(domain string) JSOpt {

}

func (s *StreamPurgeRequest) configureJSContext(js *jsOpts) error {
js.purgeOpts = s
return nil
}

// APIPrefix changes the default prefix used for the JetStream API.
func APIPrefix(pre string) JSOpt {
return jsOptFn(func(js *jsOpts) error {
Expand Down
19 changes: 15 additions & 4 deletions jsm.go
Expand Up @@ -925,8 +925,8 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
return nil
}

// streamPurgeRequest is optional request information to the purge API.
type streamPurgeRequest struct {
// StreamPurgeRequest is optional request information to the purge API.
type StreamPurgeRequest struct {
// Purge up to but not including sequence.
Sequence uint64 `json:"seq,omitempty"`
// Subject to match against messages for the purge command.
Expand All @@ -946,10 +946,18 @@ func (js *js) PurgeStream(stream string, opts ...JSOpt) error {
if err := checkStreamName(stream); err != nil {
return err
}
return js.purgeStream(stream, nil)
var req *StreamPurgeRequest
var ok bool
for _, opt := range opts {
// For PurgeStream, only request body opt is relevant
if req, ok = opt.(*StreamPurgeRequest); ok {
break
}
}
return js.purgeStream(stream, req)
}

func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt) error {
func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return err
Expand All @@ -975,6 +983,9 @@ func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt)
return err
}
if resp.Error != nil {
if resp.Error.Code == 400 {
return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body")
}
return errors.New(resp.Error.Description)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion kv.go
Expand Up @@ -717,7 +717,7 @@ func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error {
}

var (
pr streamPurgeRequest
pr StreamPurgeRequest
b strings.Builder
)
// Do actual purges here.
Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -167,6 +167,7 @@ var (
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrBadRequest = errors.New("nats: bad request")
)

func init() {
Expand Down
6 changes: 3 additions & 3 deletions object.go
Expand Up @@ -336,7 +336,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
return perr
}

purgePartial := func() { obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) }
purgePartial := func() { obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) }

// Create our own JS context to handle errors etc.
js, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) }))
Expand Down Expand Up @@ -439,7 +439,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
// Delete any original one.
if einfo != nil && !einfo.Deleted {
chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, einfo.NUID)
obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj})
obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

return info, nil
Expand Down Expand Up @@ -600,7 +600,7 @@ func (obs *obs) Delete(name string) error {

// Purge chunks for the object.
chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
return obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj})
return obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
}

// AddLink will add a link to another object into this object store.
Expand Down
211 changes: 211 additions & 0 deletions test/js_test.go
Expand Up @@ -1491,6 +1491,217 @@ func TestJetStreamManagement(t *testing.T) {
})
}

func TestPurgeStream(t *testing.T) {
testData := []nats.Msg{
{
Subject: "foo.A",
Data: []byte("first on A"),
},
{
Subject: "foo.C",
Data: []byte("first on C"),
},
{
Subject: "foo.B",
Data: []byte("first on B"),
},
{
Subject: "foo.C",
Data: []byte("second on C"),
},
}

tests := []struct {
name string
stream string
req *nats.StreamPurgeRequest
withError error
expected []nats.Msg
}{
{
name: "purge all messages",
stream: "foo",
expected: []nats.Msg{},
},
{
name: "with filter subject",
stream: "foo",
req: &nats.StreamPurgeRequest{
Subject: "foo.C",
},
expected: []nats.Msg{
{
Subject: "foo.A",
Data: []byte("first on A"),
},
{
Subject: "foo.B",
Data: []byte("first on B"),
},
},
},
{
name: "with sequence",
stream: "foo",
req: &nats.StreamPurgeRequest{
Sequence: 3,
},
expected: []nats.Msg{
{
Subject: "foo.B",
Data: []byte("first on B"),
},
{
Subject: "foo.C",
Data: []byte("second on C"),
},
},
},
{
name: "with keep",
stream: "foo",
req: &nats.StreamPurgeRequest{
Keep: 1,
},
expected: []nats.Msg{
{
Subject: "foo.C",
Data: []byte("second on C"),
},
},
},
{
name: "with filter and sequence",
stream: "foo",
req: &nats.StreamPurgeRequest{
Subject: "foo.C",
Sequence: 3,
},
expected: []nats.Msg{
{
Subject: "foo.A",
Data: []byte("first on A"),
},
{
Subject: "foo.B",
Data: []byte("first on B"),
},
{
Subject: "foo.C",
Data: []byte("second on C"),
},
},
},
{
name: "with filter and keep",
stream: "foo",
req: &nats.StreamPurgeRequest{
Subject: "foo.C",
Keep: 1,
},
expected: []nats.Msg{
{
Subject: "foo.A",
Data: []byte("first on A"),
},
{
Subject: "foo.B",
Data: []byte("first on B"),
},
{
Subject: "foo.C",
Data: []byte("second on C"),
},
},
},
{
name: "empty stream name",
stream: "",
req: &nats.StreamPurgeRequest{
Subject: "foo.C",
Keep: 1,
},
withError: nats.ErrStreamNameRequired,
},
{
name: "invalid stream name",
stream: "bad.stream.name",
req: &nats.StreamPurgeRequest{
Subject: "foo.C",
Keep: 1,
},
withError: nats.ErrInvalidStreamName,
},
{
name: "invalid request - both sequence and keep provided",
stream: "foo",
req: &nats.StreamPurgeRequest{
Sequence: 3,
Keep: 1,
},
withError: nats.ErrBadRequest,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.A", "foo.B", "foo.C"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for _, msg := range testData {
if _, err := js.PublishMsg(&msg); err != nil {
t.Fatalf("Unexpected error during publish: %v", err)
}
}

err = js.PurgeStream(test.stream, test.req)
if test.withError != nil {
if err == nil {
t.Fatal("Expected error, got nil")
}
if !errors.Is(err, test.withError) {
t.Fatalf("Expected error: '%s'; got '%s'", test.withError, err)
}
return
}

streamInfo, err := js.StreamInfo("foo", test.req)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if streamInfo.State.Msgs != uint64(len(test.expected)) {
t.Fatalf("Unexpected message count: expected %d; got: %d", len(test.expected), streamInfo.State.Msgs)
}
sub, err := js.SubscribeSync("foo.*", nats.BindStream("foo"))
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
for i := 0; i < int(streamInfo.State.Msgs); i++ {
msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if msg.Subject != test.expected[i].Subject {
t.Fatalf("Unexpected message; subject is different than expected: want %s; got: %s", test.expected[i].Subject, msg.Subject)
}
if string(msg.Data) != string(test.expected[i].Data) {
t.Fatalf("Unexpected message; data is different than expected: want %s; got: %s", test.expected[i].Data, msg.Data)
}
}
})
}
}

func TestJetStreamManagement_GetMsg(t *testing.T) {
t.Run("1-node", func(t *testing.T) {
withJSServer(t, testJetStreamManagement_GetMsg)
Expand Down

0 comments on commit 5d37129

Please sign in to comment.