Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Paging subjects in stream info #1517

Merged
merged 3 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type (
}
streamInfoResponse struct {
apiResponse
apiPaged
*StreamInfo
}

Expand Down
6 changes: 5 additions & 1 deletion jetstream/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,11 @@ func WithDeletedDetails(deletedDetails bool) StreamInfoOpt {
}
}

// WithSubjectFilter can be used to display the information about messages stored on given subjects
// WithSubjectFilter can be used to display the information about messages
// stored on given subjects.
// NOTE: if the subject filter matches over 100k
// subjects, this will result in multiple requests to the server to retrieve all
// the information, and all of the returned subjects will be kept in memory.
func WithSubjectFilter(subject string) StreamInfoOpt {
return func(req *streamInfoRequest) error {
req.SubjectFilter = subject
Expand Down
63 changes: 46 additions & 17 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type (
StreamInfoOpt func(*streamInfoRequest) error

streamInfoRequest struct {
apiPaged
DeletedDetails bool `json:"deleted_details,omitempty"`
SubjectFilter string `json:"subjects_filter,omitempty"`
}
Expand Down Expand Up @@ -259,28 +260,56 @@ func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo,
}
var req []byte
var err error
if infoReq != nil {
req, err = json.Marshal(infoReq)
if err != nil {
return nil, err
}
}
var subjectMap map[string]uint64
var offset int

infoSubject := apiSubj(s.jetStream.apiPrefix, fmt.Sprintf(apiStreamInfoT, s.name))
var resp streamInfoResponse

if _, err = s.jetStream.apiRequestJSON(ctx, infoSubject, &resp, req); err != nil {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
return nil, ErrStreamNotFound
var info *StreamInfo
for {
if infoReq != nil {
if infoReq.SubjectFilter != "" {
if subjectMap == nil {
subjectMap = make(map[string]uint64)
}
infoReq.Offset = offset
}
req, err = json.Marshal(infoReq)
if err != nil {
return nil, err
}
}
var resp streamInfoResponse
if _, err = s.jetStream.apiRequestJSON(ctx, infoSubject, &resp, req); err != nil {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}
info = resp.StreamInfo
var total int
if resp.Total != 0 {
total = resp.Total
}
if len(resp.StreamInfo.State.Subjects) > 0 {
for subj, msgs := range resp.StreamInfo.State.Subjects {
subjectMap[subj] = msgs
}
offset = len(subjectMap)
}
if total == 0 || total <= offset {
info.State.Subjects = nil
// we don't want to store subjects in cache
cached := *info
s.info = &cached
info.State.Subjects = subjectMap
break
}
return nil, resp.Error
}
s.info = resp.StreamInfo

return resp.StreamInfo, nil
return info, nil
}

// CachedInfo returns *StreamInfo cached on a stream struct
Expand Down
42 changes: 42 additions & 0 deletions jetstream/test/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,48 @@ func TestStreamInfo(t *testing.T) {
}
}

func TestSubjectsFilterPaging(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)

nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()
js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := 0; i < 110000; i++ {
if _, err := js.PublishAsync(fmt.Sprintf("FOO.%d", i), nil); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatal("PublishAsyncComplete timeout")
}

info, err := s.Info(context.Background(), jetstream.WithSubjectFilter("FOO.*"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(info.State.Subjects) != 110000 {
t.Fatalf("Unexpected number of subjects; want: 110000; got: %d", len(info.State.Subjects))
}
cInfo := s.CachedInfo()
if len(cInfo.State.Subjects) != 0 {
t.Fatalf("Unexpected number of subjects; want: 0; got: %d", len(cInfo.State.Subjects))
}
}

func TestStreamCachedInfo(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
Expand Down