Skip to content

Commit

Permalink
Merge pull request #1010 from nats-io/fix-1009
Browse files Browse the repository at this point in the history
Added support for Subject reporting on StreamInfo requests
  • Loading branch information
aricart committed Jun 30, 2022
2 parents dcbb65a + 557d648 commit d29a40a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 11 deletions.
25 changes: 15 additions & 10 deletions jsm.go
Expand Up @@ -638,9 +638,12 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
}

type (
// StreamInfoRequest contains additional option to return details about messages deleted from a stream
// StreamInfoRequest contains additional option to return
StreamInfoRequest struct {
// DeletedDetails when true includes information about deleted messages
DeletedDetails bool `json:"deleted_details,omitempty"`
// SubjectsFilter when set, returns information on the matched subjects
SubjectsFilter string `json:"subjects_filter,omitempty"`
}
streamInfoResponse = streamCreateResponse
)
Expand Down Expand Up @@ -701,15 +704,17 @@ type StreamSourceInfo struct {

// StreamState is information about the given stream.
type StreamState struct {
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Consumers int `json:"consumer_count"`
Deleted []uint64 `json:"deleted"`
NumDeleted int `json:"num_deleted"`
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Consumers int `json:"consumer_count"`
Deleted []uint64 `json:"deleted"`
NumDeleted int `json:"num_deleted"`
NumSubjects uint64 `json:"num_subjects"`
Subjects map[string]uint64 `json:"subjects"`
}

// ClusterInfo shows information about the underlying set of servers
Expand Down
41 changes: 40 additions & 1 deletion test/js_test.go
Expand Up @@ -1702,7 +1702,46 @@ func TestPurgeStream(t *testing.T) {
}
}

func TestStreamInfo(t *testing.T) {
func TestStreamInfoSubjectInfo(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.*"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if _, err := js.Publish("foo.A", []byte("")); err != nil {
t.Fatalf("Unexpected error during publish: %v", err)
}
if _, err := js.Publish("foo.B", []byte("")); err != nil {
t.Fatalf("Unexpected error during publish: %v", err)
}

si, err := js.StreamInfo("foo", &nats.StreamInfoRequest{
SubjectsFilter: "foo.A",
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.NumSubjects != 2 {
t.Fatal("Expected NumSubjects to be 1")
}
if len(si.State.Subjects) != 1 {
t.Fatal("Expected Subjects len to be 1")
}
if si.State.Subjects["foo.A"] != 1 {
t.Fatal("Expected Subjects to have an entry for foo.A with a count of 1")
}
}

func TestStreamInfoDeletedDetails(t *testing.T) {
testData := []string{"one", "two", "three", "four"}

tests := []struct {
Expand Down

0 comments on commit d29a40a

Please sign in to comment.