Skip to content

Commit

Permalink
from ...string to ...JSOpt
Browse files Browse the repository at this point in the history
  • Loading branch information
jnmoyne committed Sep 13, 2022
1 parent b753f69 commit 200b836
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 17 deletions.
2 changes: 1 addition & 1 deletion js_test.go
Expand Up @@ -1120,7 +1120,7 @@ func TestJetStreamStreamInfoSubjectDetails(t *testing.T) {
nc.Publish(fmt.Sprintf("test.%d", i), payload)
}

result, err := js.StreamContainedSubjects("TEST", "test.*")
result, err := js.StreamContainedSubjects("TEST", &StreamInfoRequest{SubjectsFilter: "test.*"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
28 changes: 12 additions & 16 deletions jsm.go
Expand Up @@ -38,7 +38,7 @@ type JetStreamManager interface {
StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error)

// StreamContainedSubjects queries the stream for the subjects it holds with optional filter
StreamContainedSubjects(stream string, filter ...string) (map[string]uint64, error)
StreamContainedSubjects(stream string, opts ...JSOpt) (map[string]uint64, error)

// PurgeStream purges a stream messages.
PurgeStream(name string, opts ...JSOpt) error
Expand Down Expand Up @@ -735,35 +735,31 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return resp.StreamInfo, nil
}

func (js *js) StreamContainedSubjects(stream string, filter ...string) (map[string]uint64, error) {
func (js *js) StreamContainedSubjects(stream string, opts ...JSOpt) (map[string]uint64, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}

if len(filter) > 1 {
return nil, fmt.Errorf("only 1 filter supported")
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
}

f := ">"
if len(filter) == 1 && filter[0] != "" {
f = filter[0]
if cancel != nil {
defer cancel()
}

var i int
var subjectMessagesMap map[string]uint64 = nil

o, cancel, err := getJSContextOpts(js.opts)
if err != nil {
return nil, err
}
if cancel != nil {
defer cancel()
if o.streamInfoOpts != nil && o.streamInfoOpts.SubjectsFilter == _EMPTY_ {
o.streamInfoOpts.SubjectsFilter = ">"
}

for {
var req []byte

if req, err = json.Marshal(StreamInfoRequest{SubjectsFilter: f, apiPagedRequest: apiPagedRequest{Offset: i}}); err != nil {
o.streamInfoOpts.Offset = i

if req, err = json.Marshal(o.streamInfoOpts); err != nil {
return nil, err
}

Expand Down

0 comments on commit 200b836

Please sign in to comment.