From 557d648019ffb32f4a9f2e33fe9baf721f80093d Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 29 Jun 2022 14:55:41 -0500 Subject: [PATCH] [FEAT] added `SubjectFilter` to `StreamInfoRequest`. The field takes a subject/wildcard that instruct the request to provide information on matching subjects in `StreamState`. [FEAT] added `NumSubjects` to `StreamState` enabling report for the number of subjects in the stream [FEAT] added `Subjects` to `StreamState` a map of subject to number of messages in the subject which gets populated if the `StreamInfoRequest` specifies a `SubjectFilter` FIX #1009 --- jsm.go | 25 +++++++++++++++---------- test/js_test.go | 41 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/jsm.go b/jsm.go index c9bcb2efa..4634fc89d 100644 --- a/jsm.go +++ b/jsm.go @@ -638,9 +638,12 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { } type ( - // StreamInfoRequest contains additional option to return details about messages deleted from a stream + // StreamInfoRequest contains additional option to return StreamInfoRequest struct { + // DeletedDetails when true includes information about deleted messages DeletedDetails bool `json:"deleted_details,omitempty"` + // SubjectsFilter when set, returns information on the matched subjects + SubjectsFilter string `json:"subjects_filter,omitempty"` } streamInfoResponse = streamCreateResponse ) @@ -701,15 +704,17 @@ type StreamSourceInfo struct { // StreamState is information about the given stream. type StreamState struct { - Msgs uint64 `json:"messages"` - Bytes uint64 `json:"bytes"` - FirstSeq uint64 `json:"first_seq"` - FirstTime time.Time `json:"first_ts"` - LastSeq uint64 `json:"last_seq"` - LastTime time.Time `json:"last_ts"` - Consumers int `json:"consumer_count"` - Deleted []uint64 `json:"deleted"` - NumDeleted int `json:"num_deleted"` + Msgs uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` + FirstSeq uint64 `json:"first_seq"` + FirstTime time.Time `json:"first_ts"` + LastSeq uint64 `json:"last_seq"` + LastTime time.Time `json:"last_ts"` + Consumers int `json:"consumer_count"` + Deleted []uint64 `json:"deleted"` + NumDeleted int `json:"num_deleted"` + NumSubjects uint64 `json:"num_subjects"` + Subjects map[string]uint64 `json:"subjects"` } // ClusterInfo shows information about the underlying set of servers diff --git a/test/js_test.go b/test/js_test.go index fd002dd78..205a21139 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1702,7 +1702,46 @@ func TestPurgeStream(t *testing.T) { } } -func TestStreamInfo(t *testing.T) { +func TestStreamInfoSubjectInfo(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.Publish("foo.A", []byte("")); err != nil { + t.Fatalf("Unexpected error during publish: %v", err) + } + if _, err := js.Publish("foo.B", []byte("")); err != nil { + t.Fatalf("Unexpected error during publish: %v", err) + } + + si, err := js.StreamInfo("foo", &nats.StreamInfoRequest{ + SubjectsFilter: "foo.A", + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.NumSubjects != 2 { + t.Fatal("Expected NumSubjects to be 1") + } + if len(si.State.Subjects) != 1 { + t.Fatal("Expected Subjects len to be 1") + } + if si.State.Subjects["foo.A"] != 1 { + t.Fatal("Expected Subjects to have an entry for foo.A with a count of 1") + } +} + +func TestStreamInfoDeletedDetails(t *testing.T) { testData := []string{"one", "two", "three", "four"} tests := []struct {