Skip to content

Commit

Permalink
[ADDED] Paging subjects in stream info (#1517)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Jan 12, 2024
1 parent 98430ac commit 22c10d4
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 18 deletions.
1 change: 1 addition & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type (
}
streamInfoResponse struct {
apiResponse
apiPaged
*StreamInfo
}

Expand Down
6 changes: 5 additions & 1 deletion jetstream/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,11 @@ func WithDeletedDetails(deletedDetails bool) StreamInfoOpt {
}
}

// WithSubjectFilter can be used to display the information about messages stored on given subjects
// WithSubjectFilter can be used to display the information about messages
// stored on given subjects.
// NOTE: if the subject filter matches over 100k
// subjects, this will result in multiple requests to the server to retrieve all
// the information, and all of the returned subjects will be kept in memory.
func WithSubjectFilter(subject string) StreamInfoOpt {
return func(req *streamInfoRequest) error {
req.SubjectFilter = subject
Expand Down
63 changes: 46 additions & 17 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type (
StreamInfoOpt func(*streamInfoRequest) error

streamInfoRequest struct {
apiPaged
DeletedDetails bool `json:"deleted_details,omitempty"`
SubjectFilter string `json:"subjects_filter,omitempty"`
}
Expand Down Expand Up @@ -259,28 +260,56 @@ func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo,
}
var req []byte
var err error
if infoReq != nil {
req, err = json.Marshal(infoReq)
if err != nil {
return nil, err
}
}
var subjectMap map[string]uint64
var offset int

infoSubject := apiSubj(s.jetStream.apiPrefix, fmt.Sprintf(apiStreamInfoT, s.name))
var resp streamInfoResponse

if _, err = s.jetStream.apiRequestJSON(ctx, infoSubject, &resp, req); err != nil {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
return nil, ErrStreamNotFound
var info *StreamInfo
for {
if infoReq != nil {
if infoReq.SubjectFilter != "" {
if subjectMap == nil {
subjectMap = make(map[string]uint64)
}
infoReq.Offset = offset
}
req, err = json.Marshal(infoReq)
if err != nil {
return nil, err
}
}
var resp streamInfoResponse
if _, err = s.jetStream.apiRequestJSON(ctx, infoSubject, &resp, req); err != nil {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}
info = resp.StreamInfo
var total int
if resp.Total != 0 {
total = resp.Total
}
if len(resp.StreamInfo.State.Subjects) > 0 {
for subj, msgs := range resp.StreamInfo.State.Subjects {
subjectMap[subj] = msgs
}
offset = len(subjectMap)
}
if total == 0 || total <= offset {
info.State.Subjects = nil
// we don't want to store subjects in cache
cached := *info
s.info = &cached
info.State.Subjects = subjectMap
break
}
return nil, resp.Error
}
s.info = resp.StreamInfo

return resp.StreamInfo, nil
return info, nil
}

// CachedInfo returns *StreamInfo cached on a stream struct
Expand Down
42 changes: 42 additions & 0 deletions jetstream/test/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,48 @@ func TestStreamInfo(t *testing.T) {
}
}

func TestSubjectsFilterPaging(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)

nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()
js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := 0; i < 110000; i++ {
if _, err := js.PublishAsync(fmt.Sprintf("FOO.%d", i), nil); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatal("PublishAsyncComplete timeout")
}

info, err := s.Info(context.Background(), jetstream.WithSubjectFilter("FOO.*"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(info.State.Subjects) != 110000 {
t.Fatalf("Unexpected number of subjects; want: 110000; got: %d", len(info.State.Subjects))
}
cInfo := s.CachedInfo()
if len(cInfo.State.Subjects) != 0 {
t.Fatalf("Unexpected number of subjects; want: 0; got: %d", len(cInfo.State.Subjects))
}
}

func TestStreamCachedInfo(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
Expand Down

0 comments on commit 22c10d4

Please sign in to comment.