Skip to content

Commit

Permalink
Merged the paging functionality into the existing StreamInfo(): if yo…
Browse files Browse the repository at this point in the history
…u pass a value in SubjectFilters in the streamInfoOpts it returns the subject details for the subjects that match (for all the matching subjects and doing the pagination if needed)
  • Loading branch information
jnmoyne committed Sep 15, 2022
1 parent 25b6392 commit 5e2bc56
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 23 deletions.
44 changes: 44 additions & 0 deletions js_test.go
Expand Up @@ -1096,3 +1096,47 @@ func TestJetStreamConvertDirectMsgResponseToMsg(t *testing.T) {
t.Fatalf("Wrong header: %v", r.Header)
}
}

func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

_, err = js.AddStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"test.*"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Publish on enough subjects to exercise the pagination
payload := make([]byte, 10)
for i := 0; i < 100001; i++ {
nc.Publish(fmt.Sprintf("test.%d", i), payload)
}

// Check that passing a filter returns the subject details
result, err := js.StreamInfo("TEST", &StreamInfoRequest{SubjectsFilter: ">"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if len(result.State.Subjects) != 100001 {
t.Fatalf("expected 100001 subjects in the stream, but got %d instead", len(result.State.Subjects))
}

// Check that passing no filter does not return any subject details
result, err = js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if len(result.State.Subjects) != 0 {
t.Fatalf("expected 0 subjects details from StreamInfo, but got %d instead", len(result.State.Subjects))
}
}
112 changes: 89 additions & 23 deletions jsm.go
Expand Up @@ -41,8 +41,12 @@ type JetStreamManager interface {
PurgeStream(name string, opts ...JSOpt) error

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
// DEPRECATED: Use Streams() instead.
StreamsInfo(opts ...JSOpt) <-chan *StreamInfo

// Streams can be used to retrieve a list of StreamInfo objects.
Streams(opts ...JSOpt) <-chan *StreamInfo

// StreamNames is used to retrieve a list of Stream names.
StreamNames(opts ...JSOpt) <-chan string

Expand Down Expand Up @@ -78,8 +82,12 @@ type JetStreamManager interface {
ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error)

// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
// DEPRECATED: Use Consumers() instead.
ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo

// Consumers is used to retrieve a list of ConsumerInfo objects.
Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo

// ConsumerNames is used to retrieve a list of Consumer names.
ConsumerNames(stream string, opts ...JSOpt) <-chan string

Expand Down Expand Up @@ -170,7 +178,7 @@ type apiPaged struct {
// apiPagedRequest includes parameters allowing specific pages to be requested
// from APIs responding with apiPaged.
type apiPagedRequest struct {
Offset int `json:"offset"`
Offset int `json:"offset,omitempty"`
}

// AccountInfo contains info about the JetStream usage from the current account.
Expand Down Expand Up @@ -502,8 +510,8 @@ func (c *consumerLister) Err() error {
return c.err
}

// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
// Consumers is used to retrieve a list of ConsumerInfo objects.
func (jsc *js) Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
o, cancel, err := getJSContextOpts(jsc.opts, opts...)
if err != nil {
return nil
Expand All @@ -530,6 +538,12 @@ func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo
return ch
}

// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
// DEPRECATED: Use Consumers() instead.
func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
return jsc.Consumers(stream, opts...)
}

type consumerNamesLister struct {
stream string
js *js
Expand Down Expand Up @@ -682,12 +696,17 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
type (
// StreamInfoRequest contains additional option to return
StreamInfoRequest struct {
apiPagedRequest
// DeletedDetails when true includes information about deleted messages
DeletedDetails bool `json:"deleted_details,omitempty"`
// SubjectsFilter when set, returns information on the matched subjects
SubjectsFilter string `json:"subjects_filter,omitempty"`
}
streamInfoResponse = streamCreateResponse
streamInfoResponse = struct {
apiResponse
apiPaged
*StreamInfo
}
)

func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
Expand All @@ -701,30 +720,71 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
if cancel != nil {
defer cancel()
}

var i int
var subjectMessagesMap map[string]uint64
var req []byte
var requestPayload bool

var siOpts StreamInfoRequest
if o.streamInfoOpts != nil {
if req, err = json.Marshal(o.streamInfoOpts); err != nil {
requestPayload = true
siOpts = *o.streamInfoOpts
}

for {
if requestPayload {
siOpts.Offset = i
if req, err = json.Marshal(&siOpts); err != nil {
return nil, err
}
}

siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))

r, err := js.apiRequestWithContext(o.ctx, siSubj, req)
if err != nil {
return nil, err
}
}
siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))

r, err := js.apiRequestWithContext(o.ctx, siSubj, req)
if err != nil {
return nil, err
}
var resp streamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
var resp streamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
return nil, resp.Error
}

return resp.StreamInfo, nil
if resp.Error != nil {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}

var total int
// for backwards compatibility
if resp.Total != 0 {
total = resp.Total
} else {
total = len(resp.State.Subjects)
}

if requestPayload && len(resp.StreamInfo.State.Subjects) > 0 {
if subjectMessagesMap == nil {
subjectMessagesMap = make(map[string]uint64, total)
}

for k, j := range resp.State.Subjects {
subjectMessagesMap[k] = j
i++
}
}

if i >= total {
if requestPayload {
resp.StreamInfo.State.Subjects = subjectMessagesMap
}
return resp.StreamInfo, nil
}
}
}

// StreamInfo shows config and current state for this stream.
Expand Down Expand Up @@ -1258,8 +1318,8 @@ func (s *streamLister) Err() error {
return s.err
}

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
// Streams can be used to retrieve a list of StreamInfo objects.
func (jsc *js) Streams(opts ...JSOpt) <-chan *StreamInfo {
o, cancel, err := getJSContextOpts(jsc.opts, opts...)
if err != nil {
return nil
Expand All @@ -1286,6 +1346,12 @@ func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
return ch
}

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
// DEPRECATED: Use Streams() instead.
func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
return jsc.Streams(opts...)
}

type streamNamesLister struct {
js *js

Expand Down

0 comments on commit 5e2bc56

Please sign in to comment.