diff --git a/js_test.go b/js_test.go index b241a678c..13424ebd1 100644 --- a/js_test.go +++ b/js_test.go @@ -1096,3 +1096,47 @@ func TestJetStreamConvertDirectMsgResponseToMsg(t *testing.T) { t.Fatalf("Wrong header: %v", r.Header) } } + +func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"test.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Publish on enough subjects to exercise the pagination + payload := make([]byte, 10) + for i := 0; i < 100001; i++ { + nc.Publish(fmt.Sprintf("test.%d", i), payload) + } + + // Check that passing a filter returns the subject details + result, err := js.StreamInfo("TEST", &StreamInfoRequest{SubjectsFilter: ">"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if len(result.State.Subjects) != 100001 { + t.Fatalf("expected 100001 subjects in the stream, but got %d instead", len(result.State.Subjects)) + } + + // Check that passing no filter does not return any subject details + result, err = js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if len(result.State.Subjects) != 0 { + t.Fatalf("expected 0 subjects details from StreamInfo, but got %d instead", len(result.State.Subjects)) + } +} diff --git a/jsm.go b/jsm.go index f1a0245e4..078c004bc 100644 --- a/jsm.go +++ b/jsm.go @@ -41,8 +41,12 @@ type JetStreamManager interface { PurgeStream(name string, opts ...JSOpt) error // StreamsInfo can be used to retrieve a list of StreamInfo objects. + // DEPRECATED: Use Streams() instead. StreamsInfo(opts ...JSOpt) <-chan *StreamInfo + // Streams can be used to retrieve a list of StreamInfo objects. + Streams(opts ...JSOpt) <-chan *StreamInfo + // StreamNames is used to retrieve a list of Stream names. StreamNames(opts ...JSOpt) <-chan string @@ -78,8 +82,12 @@ type JetStreamManager interface { ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error) // ConsumersInfo is used to retrieve a list of ConsumerInfo objects. + // DEPRECATED: Use Consumers() instead. ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo + // Consumers is used to retrieve a list of ConsumerInfo objects. + Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo + // ConsumerNames is used to retrieve a list of Consumer names. ConsumerNames(stream string, opts ...JSOpt) <-chan string @@ -170,7 +178,7 @@ type apiPaged struct { // apiPagedRequest includes parameters allowing specific pages to be requested // from APIs responding with apiPaged. type apiPagedRequest struct { - Offset int `json:"offset"` + Offset int `json:"offset,omitempty"` } // AccountInfo contains info about the JetStream usage from the current account. @@ -502,8 +510,8 @@ func (c *consumerLister) Err() error { return c.err } -// ConsumersInfo is used to retrieve a list of ConsumerInfo objects. -func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo { +// Consumers is used to retrieve a list of ConsumerInfo objects. +func (jsc *js) Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo { o, cancel, err := getJSContextOpts(jsc.opts, opts...) if err != nil { return nil @@ -530,6 +538,12 @@ func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo return ch } +// ConsumersInfo is used to retrieve a list of ConsumerInfo objects. +// DEPRECATED: Use Consumers() instead. +func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo { + return jsc.Consumers(stream, opts...) +} + type consumerNamesLister struct { stream string js *js @@ -682,12 +696,17 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { type ( // StreamInfoRequest contains additional option to return StreamInfoRequest struct { + apiPagedRequest // DeletedDetails when true includes information about deleted messages DeletedDetails bool `json:"deleted_details,omitempty"` // SubjectsFilter when set, returns information on the matched subjects SubjectsFilter string `json:"subjects_filter,omitempty"` } - streamInfoResponse = streamCreateResponse + streamInfoResponse = struct { + apiResponse + apiPaged + *StreamInfo + } ) func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { @@ -701,30 +720,71 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { if cancel != nil { defer cancel() } + + var i int + var subjectMessagesMap map[string]uint64 var req []byte + var requestPayload bool + + var siOpts StreamInfoRequest if o.streamInfoOpts != nil { - if req, err = json.Marshal(o.streamInfoOpts); err != nil { + requestPayload = true + siOpts = *o.streamInfoOpts + } + + for { + if requestPayload { + siOpts.Offset = i + if req, err = json.Marshal(&siOpts); err != nil { + return nil, err + } + } + + siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream)) + + r, err := js.apiRequestWithContext(o.ctx, siSubj, req) + if err != nil { return nil, err } - } - siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream)) - r, err := js.apiRequestWithContext(o.ctx, siSubj, req) - if err != nil { - return nil, err - } - var resp streamInfoResponse - if err := json.Unmarshal(r.Data, &resp); err != nil { - return nil, err - } - if resp.Error != nil { - if errors.Is(resp.Error, ErrStreamNotFound) { - return nil, ErrStreamNotFound + var resp streamInfoResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return nil, err } - return nil, resp.Error - } - return resp.StreamInfo, nil + if resp.Error != nil { + if errors.Is(resp.Error, ErrStreamNotFound) { + return nil, ErrStreamNotFound + } + return nil, resp.Error + } + + var total int + // for backwards compatibility + if resp.Total != 0 { + total = resp.Total + } else { + total = len(resp.State.Subjects) + } + + if requestPayload && len(resp.StreamInfo.State.Subjects) > 0 { + if subjectMessagesMap == nil { + subjectMessagesMap = make(map[string]uint64, total) + } + + for k, j := range resp.State.Subjects { + subjectMessagesMap[k] = j + i++ + } + } + + if i >= total { + if requestPayload { + resp.StreamInfo.State.Subjects = subjectMessagesMap + } + return resp.StreamInfo, nil + } + } } // StreamInfo shows config and current state for this stream. @@ -1258,8 +1318,8 @@ func (s *streamLister) Err() error { return s.err } -// StreamsInfo can be used to retrieve a list of StreamInfo objects. -func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo { +// Streams can be used to retrieve a list of StreamInfo objects. +func (jsc *js) Streams(opts ...JSOpt) <-chan *StreamInfo { o, cancel, err := getJSContextOpts(jsc.opts, opts...) if err != nil { return nil @@ -1286,6 +1346,12 @@ func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo { return ch } +// StreamsInfo can be used to retrieve a list of StreamInfo objects. +// DEPRECATED: Use Streams() instead. +func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo { + return jsc.Streams(opts...) +} + type streamNamesLister struct { js *js