Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] StreamInfo() will now return all subjects when requested #1072

Merged
merged 1 commit into from Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 44 additions & 0 deletions js_test.go
Expand Up @@ -1157,3 +1157,47 @@ func TestJetStreamConsumerMemoryStorage(t *testing.T) {
t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage)
}
}

func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

_, err = js.AddStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"test.*"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Publish on enough subjects to exercise the pagination
payload := make([]byte, 10)
for i := 0; i < 100001; i++ {
nc.Publish(fmt.Sprintf("test.%d", i), payload)
}

// Check that passing a filter returns the subject details
result, err := js.StreamInfo("TEST", &StreamInfoRequest{SubjectsFilter: ">"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if len(result.State.Subjects) != 100001 {
t.Fatalf("expected 100001 subjects in the stream, but got %d instead", len(result.State.Subjects))
}

// Check that passing no filter does not return any subject details
result, err = js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if len(result.State.Subjects) != 0 {
t.Fatalf("expected 0 subjects details from StreamInfo, but got %d instead", len(result.State.Subjects))
}
}
84 changes: 65 additions & 19 deletions jsm.go
Expand Up @@ -178,7 +178,7 @@ type apiPaged struct {
// apiPagedRequest includes parameters allowing specific pages to be requested
// from APIs responding with apiPaged.
type apiPagedRequest struct {
Offset int `json:"offset"`
Offset int `json:"offset,omitempty"`
}

// AccountInfo contains info about the JetStream usage from the current account.
Expand Down Expand Up @@ -696,12 +696,17 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
type (
// StreamInfoRequest contains additional option to return
StreamInfoRequest struct {
apiPagedRequest
jnmoyne marked this conversation as resolved.
Show resolved Hide resolved
// DeletedDetails when true includes information about deleted messages
DeletedDetails bool `json:"deleted_details,omitempty"`
// SubjectsFilter when set, returns information on the matched subjects
SubjectsFilter string `json:"subjects_filter,omitempty"`
}
streamInfoResponse = streamCreateResponse
streamInfoResponse = struct {
apiResponse
apiPaged
*StreamInfo
}
)

func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
Expand All @@ -715,30 +720,71 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
if cancel != nil {
defer cancel()
}

var i int
var subjectMessagesMap map[string]uint64
var req []byte
var requestPayload bool

var siOpts StreamInfoRequest
if o.streamInfoOpts != nil {
if req, err = json.Marshal(o.streamInfoOpts); err != nil {
requestPayload = true
siOpts = *o.streamInfoOpts
}

for {
if requestPayload {
siOpts.Offset = i
if req, err = json.Marshal(&siOpts); err != nil {
return nil, err
}
}

siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))

r, err := js.apiRequestWithContext(o.ctx, siSubj, req)
if err != nil {
return nil, err
}
}
siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))

r, err := js.apiRequestWithContext(o.ctx, siSubj, req)
if err != nil {
return nil, err
}
var resp streamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
var resp streamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
return nil, resp.Error
}

return resp.StreamInfo, nil
if resp.Error != nil {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}

var total int
// for backwards compatibility
if resp.Total != 0 {
total = resp.Total
} else {
total = len(resp.State.Subjects)
}

if requestPayload && len(resp.StreamInfo.State.Subjects) > 0 {
if subjectMessagesMap == nil {
subjectMessagesMap = make(map[string]uint64, total)
}

for k, j := range resp.State.Subjects {
subjectMessagesMap[k] = j
i++
}
}

if i >= total {
if requestPayload {
resp.StreamInfo.State.Subjects = subjectMessagesMap
}
return resp.StreamInfo, nil
}
}
}

// StreamInfo shows config and current state for this stream.
Expand Down