diff --git a/js.go b/js.go index bb827f9ea..88244e7e9 100644 --- a/js.go +++ b/js.go @@ -239,6 +239,8 @@ type jsOpts struct { purgeOpts *StreamPurgeRequest // streamInfoOpts contains optional stream info options streamInfoOpts *StreamInfoRequest + // streamListSubject is used for subject filtering when listing streams / stream names + streamListSubject string // For direct get message requests directGet bool // For direct get next message @@ -359,6 +361,17 @@ func DirectGetNext(subject string) JSOpt { }) } +// StreamListFilter is an option that can be used to configure `StreamsInfo()` and `StreamNames()` requests. +// It allows filtering the retured streams by subject associated with each stream. +// Wildcards can be used. For example, `StreamListFilter(FOO.>.A) will return +// all streams which have at least one subject matching the provided pattern (e.g. FOO.TEST.A). +func StreamListFilter(subject string) JSOpt { + return jsOptFn(func(opts *jsOpts) error { + opts.streamListSubject = subject + return nil + }) +} + func (js *js) apiSubj(subj string) string { if js.opts.pre == _EMPTY_ { return subj diff --git a/jsm.go b/jsm.go index ecea0c14c..a5ebc9b09 100644 --- a/jsm.go +++ b/jsm.go @@ -1211,6 +1211,7 @@ func (s *streamLister) Next() bool { req, err := json.Marshal(streamNamesRequest{ apiPagedRequest: apiPagedRequest{Offset: s.offset}, + Subject: s.js.opts.streamListSubject, }) if err != nil { s.err = err @@ -1311,6 +1312,7 @@ func (l *streamNamesLister) Next() bool { req, err := json.Marshal(streamNamesRequest{ apiPagedRequest: apiPagedRequest{Offset: l.offset}, + Subject: l.js.opts.streamListSubject, }) if err != nil { l.err = err diff --git a/test/js_test.go b/test/js_test.go index 3327b1590..a902b411a 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1692,6 +1692,75 @@ func TestStreamLister(t *testing.T) { } } +func TestStreamLister_FilterSubject(t *testing.T) { + streams := map[string][]string{ + "s1": {"foo"}, + "s2": {"bar"}, + "s3": {"foo.*", "bar.*"}, + "s4": {"foo-1.A"}, + "s5": {"foo.A.bar.B"}, + "s6": {"foo.C.bar.D.E"}, + } + tests := []struct { + filter string + expected []string + }{ + { + filter: "foo", + expected: []string{"s1"}, + }, + { + filter: "bar", + expected: []string{"s2"}, + }, + { + filter: "*", + expected: []string{"s1", "s2"}, + }, + { + filter: ">", + expected: []string{"s1", "s2", "s3", "s4", "s5", "s6"}, + }, + { + filter: "*.A", + expected: []string{"s3", "s4"}, + }, + } + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + for name, subjects := range streams { + if _, err := js.AddStream(&nats.StreamConfig{Name: name, Subjects: subjects}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + for _, test := range tests { + t.Run(test.filter, func(t *testing.T) { + names := make([]string, 0) + + // list stream names + for name := range js.StreamNames(nats.StreamListFilter(test.filter)) { + names = append(names, name) + } + if !reflect.DeepEqual(names, test.expected) { + t.Fatalf("Invalid result; want: %v; got: %v", test.expected, names) + } + + // list streams + names = make([]string, 0) + for info := range js.StreamsInfo(nats.StreamListFilter(test.filter)) { + names = append(names, info.Config.Name) + } + if !reflect.DeepEqual(names, test.expected) { + t.Fatalf("Invalid result; want: %v; got: %v", test.expected, names) + } + }) + } +} + func TestConsumersLister(t *testing.T) { tests := []struct { name string