diff --git a/jsm.go b/jsm.go index c9bcb2efa..4634fc89d 100644 --- a/jsm.go +++ b/jsm.go @@ -638,9 +638,12 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { } type ( - // StreamInfoRequest contains additional option to return details about messages deleted from a stream + // StreamInfoRequest contains additional option to return StreamInfoRequest struct { + // DeletedDetails when true includes information about deleted messages DeletedDetails bool `json:"deleted_details,omitempty"` + // SubjectsFilter when set, returns information on the matched subjects + SubjectsFilter string `json:"subjects_filter,omitempty"` } streamInfoResponse = streamCreateResponse ) @@ -701,15 +704,17 @@ type StreamSourceInfo struct { // StreamState is information about the given stream. type StreamState struct { - Msgs uint64 `json:"messages"` - Bytes uint64 `json:"bytes"` - FirstSeq uint64 `json:"first_seq"` - FirstTime time.Time `json:"first_ts"` - LastSeq uint64 `json:"last_seq"` - LastTime time.Time `json:"last_ts"` - Consumers int `json:"consumer_count"` - Deleted []uint64 `json:"deleted"` - NumDeleted int `json:"num_deleted"` + Msgs uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` + FirstSeq uint64 `json:"first_seq"` + FirstTime time.Time `json:"first_ts"` + LastSeq uint64 `json:"last_seq"` + LastTime time.Time `json:"last_ts"` + Consumers int `json:"consumer_count"` + Deleted []uint64 `json:"deleted"` + NumDeleted int `json:"num_deleted"` + NumSubjects uint64 `json:"num_subjects"` + Subjects map[string]uint64 `json:"subjects"` } // ClusterInfo shows information about the underlying set of servers diff --git a/test/js_test.go b/test/js_test.go index fd002dd78..205a21139 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1702,7 +1702,46 @@ func TestPurgeStream(t *testing.T) { } } -func TestStreamInfo(t *testing.T) { +func TestStreamInfoSubjectInfo(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.Publish("foo.A", []byte("")); err != nil { + t.Fatalf("Unexpected error during publish: %v", err) + } + if _, err := js.Publish("foo.B", []byte("")); err != nil { + t.Fatalf("Unexpected error during publish: %v", err) + } + + si, err := js.StreamInfo("foo", &nats.StreamInfoRequest{ + SubjectsFilter: "foo.A", + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.NumSubjects != 2 { + t.Fatal("Expected NumSubjects to be 1") + } + if len(si.State.Subjects) != 1 { + t.Fatal("Expected Subjects len to be 1") + } + if si.State.Subjects["foo.A"] != 1 { + t.Fatal("Expected Subjects to have an entry for foo.A with a count of 1") + } +} + +func TestStreamInfoDeletedDetails(t *testing.T) { testData := []string{"one", "two", "three", "four"} tests := []struct {