From 85350ec623ea783d479090abc969e6c125336ad9 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 29 Jun 2022 09:56:16 -0500 Subject: [PATCH] [FEAT] added `SubjectFilter` to `StreamInfoRequest`. The field takes a subject/wildcard that instruct the request to provide information on matching subjects in `StreamState`. [FEAT] added `NumSubjects` to `StreamState` enabling report for the number of subjects in the stream [FEAT] added `Subjects` to `StreamState` a map of subject to number of messages in the subject which gets populated if the `StreamInfoRequest` specifies a `SubjectFilter` FIX #1009 --- jsm.go | 25 +++++++++++++++---------- test/js_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/jsm.go b/jsm.go index c9bcb2efa..6a3bcbc17 100644 --- a/jsm.go +++ b/jsm.go @@ -638,9 +638,12 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { } type ( - // StreamInfoRequest contains additional option to return details about messages deleted from a stream + // StreamInfoRequest contains additional option to return StreamInfoRequest struct { + // DeletedDetails when true includes information about deleted messages DeletedDetails bool `json:"deleted_details,omitempty"` + // SubjectsFilter When set, returns information on the matched subjects - can be a wildcard or wild-carded subject + SubjectsFilter string `json:"subjects_filter,omitempty"` } streamInfoResponse = streamCreateResponse ) @@ -701,15 +704,17 @@ type StreamSourceInfo struct { // StreamState is information about the given stream. type StreamState struct { - Msgs uint64 `json:"messages"` - Bytes uint64 `json:"bytes"` - FirstSeq uint64 `json:"first_seq"` - FirstTime time.Time `json:"first_ts"` - LastSeq uint64 `json:"last_seq"` - LastTime time.Time `json:"last_ts"` - Consumers int `json:"consumer_count"` - Deleted []uint64 `json:"deleted"` - NumDeleted int `json:"num_deleted"` + Msgs uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` + FirstSeq uint64 `json:"first_seq"` + FirstTime time.Time `json:"first_ts"` + LastSeq uint64 `json:"last_seq"` + LastTime time.Time `json:"last_ts"` + Consumers int `json:"consumer_count"` + Deleted []uint64 `json:"deleted"` + NumDeleted int `json:"num_deleted"` + NumSubjects uint64 `json:"num_subjects"` + Subjects map[string]uint64 `json:"subjects"` } // ClusterInfo shows information about the underlying set of servers diff --git a/test/js_test.go b/test/js_test.go index fd002dd78..a440ad679 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1702,6 +1702,42 @@ func TestPurgeStream(t *testing.T) { } } +func TestStreamInfoSubjectInfo(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.Publish("foo.A", []byte("")); err != nil { + t.Fatalf("Unexpected error during publish: %v", err) + } + + si, err := js.StreamInfo("foo", &nats.StreamInfoRequest{ + SubjectsFilter: ">", + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.NumSubjects != 1 { + t.Fatal("Expected NumSubjects to be 1") + } + if len(si.State.Subjects) != 1 { + t.Fatal("Expected Subjects len to be 1") + } + if si.State.Subjects["foo.A"] != 1 { + t.Fatal("Expected Subjects to have an entry for foo.A with a count of 1") + } +} + func TestStreamInfo(t *testing.T) { testData := []string{"one", "two", "three", "four"}