-
Notifications
You must be signed in to change notification settings - Fork 665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ADDED] Paging subjects in stream info #1517
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -158,6 +158,7 @@ type ( | |
} | ||
streamInfoResponse struct { | ||
apiResponse | ||
apiPaged | ||
*StreamInfo | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,6 +102,7 @@ type ( | |
StreamInfoOpt func(*streamInfoRequest) error | ||
|
||
streamInfoRequest struct { | ||
apiPaged | ||
DeletedDetails bool `json:"deleted_details,omitempty"` | ||
SubjectFilter string `json:"subjects_filter,omitempty"` | ||
} | ||
|
@@ -259,28 +260,52 @@ func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, | |
} | ||
var req []byte | ||
var err error | ||
if infoReq != nil { | ||
req, err = json.Marshal(infoReq) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
var subjectMap map[string]uint64 | ||
var offset int | ||
|
||
infoSubject := apiSubj(s.jetStream.apiPrefix, fmt.Sprintf(apiStreamInfoT, s.name)) | ||
var resp streamInfoResponse | ||
|
||
if _, err = s.jetStream.apiRequestJSON(ctx, infoSubject, &resp, req); err != nil { | ||
return nil, err | ||
} | ||
if resp.Error != nil { | ||
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound { | ||
return nil, ErrStreamNotFound | ||
for { | ||
if infoReq != nil { | ||
if infoReq.SubjectFilter != "" { | ||
if subjectMap == nil { | ||
subjectMap = make(map[string]uint64) | ||
} | ||
infoReq.Offset = offset | ||
} | ||
req, err = json.Marshal(infoReq) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
var resp streamInfoResponse | ||
if _, err = s.jetStream.apiRequestJSON(ctx, infoSubject, &resp, req); err != nil { | ||
return nil, err | ||
} | ||
if resp.Error != nil { | ||
if resp.Error.ErrorCode == JSErrCodeStreamNotFound { | ||
return nil, ErrStreamNotFound | ||
} | ||
return nil, resp.Error | ||
} | ||
s.info = resp.StreamInfo | ||
var total int | ||
if resp.Total != 0 { | ||
total = resp.Total | ||
} | ||
if len(resp.StreamInfo.State.Subjects) > 0 { | ||
for subj, msgs := range resp.StreamInfo.State.Subjects { | ||
subjectMap[subj] = msgs | ||
} | ||
offset = len(subjectMap) | ||
} | ||
if total == 0 || total <= offset { | ||
resp.StreamInfo.State.Subjects = subjectMap | ||
s.info = resp.StreamInfo | ||
break | ||
} | ||
return nil, resp.Error | ||
} | ||
s.info = resp.StreamInfo | ||
|
||
return resp.StreamInfo, nil | ||
return s.info, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we consider iterator API for receiving those subjects to avoid allocating a big map? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would change the API - and require someone to page it manually - the big map is going to be large, but that in the end is what they asked for if they did a ">" - remember that request is always specifying a subject filter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tend to agree, I'm not sure how much do we really need an additional API for that, considering that you do specify the filter. I would probably leave it as it is. |
||
} | ||
|
||
// CachedInfo returns *StreamInfo cached on a stream struct | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should store all the subjects in the cached info, as this might really grow with time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I agree, that might be too much, I'll leave the cached info without the subjects
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
possibly if they asked for it, you can save it on the cache, but if the next info doesn't require that, simply replace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed caching subjects map