Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for Subject reporting on StreamInfo requests #1010

Merged
merged 1 commit into from Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 15 additions & 10 deletions jsm.go
Expand Up @@ -638,9 +638,12 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
}

type (
// StreamInfoRequest contains additional option to return details about messages deleted from a stream
piotrpio marked this conversation as resolved.
Show resolved Hide resolved
// StreamInfoRequest contains additional option to return
StreamInfoRequest struct {
// DeletedDetails when true includes information about deleted messages
DeletedDetails bool `json:"deleted_details,omitempty"`
// SubjectsFilter when set, returns information on the matched subjects
SubjectsFilter string `json:"subjects_filter,omitempty"`
}
streamInfoResponse = streamCreateResponse
)
Expand Down Expand Up @@ -701,15 +704,17 @@ type StreamSourceInfo struct {

// StreamState is information about the given stream.
type StreamState struct {
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Consumers int `json:"consumer_count"`
Deleted []uint64 `json:"deleted"`
NumDeleted int `json:"num_deleted"`
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Consumers int `json:"consumer_count"`
Deleted []uint64 `json:"deleted"`
NumDeleted int `json:"num_deleted"`
NumSubjects uint64 `json:"num_subjects"`
Subjects map[string]uint64 `json:"subjects"`
}

// ClusterInfo shows information about the underlying set of servers
Expand Down
41 changes: 40 additions & 1 deletion test/js_test.go
Expand Up @@ -1702,7 +1702,46 @@ func TestPurgeStream(t *testing.T) {
}
}

func TestStreamInfo(t *testing.T) {
func TestStreamInfoSubjectInfo(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There already is a test called TestStreamInfo which tests basic stream info request (without parameters) as well as request with DeletedDetails == true.
So it would be nice to either extend this existing test with additional cases for SubjectInfo or at least rename TestStreamInfo to e.g. TestStreamInfoDeletedDetails to be consistent in naming.

Copy link
Member Author

@aricart aricart Jun 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming the other test to TestStreamInfoDeletedDetails

s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.*"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if _, err := js.Publish("foo.A", []byte("")); err != nil {
t.Fatalf("Unexpected error during publish: %v", err)
}
if _, err := js.Publish("foo.B", []byte("")); err != nil {
t.Fatalf("Unexpected error during publish: %v", err)
}

kozlovic marked this conversation as resolved.
Show resolved Hide resolved
si, err := js.StreamInfo("foo", &nats.StreamInfoRequest{
SubjectsFilter: "foo.A",
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.NumSubjects != 2 {
t.Fatal("Expected NumSubjects to be 1")
}
if len(si.State.Subjects) != 1 {
t.Fatal("Expected Subjects len to be 1")
}
if si.State.Subjects["foo.A"] != 1 {
t.Fatal("Expected Subjects to have an entry for foo.A with a count of 1")
}
}

func TestStreamInfoDeletedDetails(t *testing.T) {
testData := []string{"one", "two", "three", "four"}

tests := []struct {
Expand Down