From 65796fc5abc1a993e8378115e64427b0fab07ba8 Mon Sep 17 00:00:00 2001 From: jnmoyne Date: Thu, 8 Sep 2022 18:08:58 -0700 Subject: [PATCH] [CHANGED] StreamInfo() will now return all subjects when requested If a subject filter is specified in the StreamInfoRequest{} option, then all matching subjects will be returned and not be capped to the server limit of 100,000. It is internally using pagination that was added in the server PR: https://github.com/nats-io/nats-server/pull/3454 --- js_test.go | 44 ++++++++++++++++++++++++++++ jsm.go | 84 ++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 109 insertions(+), 19 deletions(-) diff --git a/js_test.go b/js_test.go index 60ccd5758..45993ed15 100644 --- a/js_test.go +++ b/js_test.go @@ -1157,3 +1157,47 @@ func TestJetStreamConsumerMemoryStorage(t *testing.T) { t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage) } } + +func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"test.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Publish on enough subjects to exercise the pagination + payload := make([]byte, 10) + for i := 0; i < 100001; i++ { + nc.Publish(fmt.Sprintf("test.%d", i), payload) + } + + // Check that passing a filter returns the subject details + result, err := js.StreamInfo("TEST", &StreamInfoRequest{SubjectsFilter: ">"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if len(result.State.Subjects) != 100001 { + t.Fatalf("expected 100001 subjects in the stream, but got %d instead", len(result.State.Subjects)) + } + + // Check that passing no filter does not return any subject details + result, err = js.StreamInfo("TEST") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if len(result.State.Subjects) != 0 { + t.Fatalf("expected 0 subjects details from StreamInfo, but got %d instead", len(result.State.Subjects)) + } +} diff --git a/jsm.go b/jsm.go index 3c35998e7..078c004bc 100644 --- a/jsm.go +++ b/jsm.go @@ -178,7 +178,7 @@ type apiPaged struct { // apiPagedRequest includes parameters allowing specific pages to be requested // from APIs responding with apiPaged. type apiPagedRequest struct { - Offset int `json:"offset"` + Offset int `json:"offset,omitempty"` } // AccountInfo contains info about the JetStream usage from the current account. @@ -696,12 +696,17 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { type ( // StreamInfoRequest contains additional option to return StreamInfoRequest struct { + apiPagedRequest // DeletedDetails when true includes information about deleted messages DeletedDetails bool `json:"deleted_details,omitempty"` // SubjectsFilter when set, returns information on the matched subjects SubjectsFilter string `json:"subjects_filter,omitempty"` } - streamInfoResponse = streamCreateResponse + streamInfoResponse = struct { + apiResponse + apiPaged + *StreamInfo + } ) func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { @@ -715,30 +720,71 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { if cancel != nil { defer cancel() } + + var i int + var subjectMessagesMap map[string]uint64 var req []byte + var requestPayload bool + + var siOpts StreamInfoRequest if o.streamInfoOpts != nil { - if req, err = json.Marshal(o.streamInfoOpts); err != nil { + requestPayload = true + siOpts = *o.streamInfoOpts + } + + for { + if requestPayload { + siOpts.Offset = i + if req, err = json.Marshal(&siOpts); err != nil { + return nil, err + } + } + + siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream)) + + r, err := js.apiRequestWithContext(o.ctx, siSubj, req) + if err != nil { return nil, err } - } - siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream)) - r, err := js.apiRequestWithContext(o.ctx, siSubj, req) - if err != nil { - return nil, err - } - var resp streamInfoResponse - if err := json.Unmarshal(r.Data, &resp); err != nil { - return nil, err - } - if resp.Error != nil { - if errors.Is(resp.Error, ErrStreamNotFound) { - return nil, ErrStreamNotFound + var resp streamInfoResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return nil, err } - return nil, resp.Error - } - return resp.StreamInfo, nil + if resp.Error != nil { + if errors.Is(resp.Error, ErrStreamNotFound) { + return nil, ErrStreamNotFound + } + return nil, resp.Error + } + + var total int + // for backwards compatibility + if resp.Total != 0 { + total = resp.Total + } else { + total = len(resp.State.Subjects) + } + + if requestPayload && len(resp.StreamInfo.State.Subjects) > 0 { + if subjectMessagesMap == nil { + subjectMessagesMap = make(map[string]uint64, total) + } + + for k, j := range resp.State.Subjects { + subjectMessagesMap[k] = j + i++ + } + } + + if i >= total { + if requestPayload { + resp.StreamInfo.State.Subjects = subjectMessagesMap + } + return resp.StreamInfo, nil + } + } } // StreamInfo shows config and current state for this stream.