Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added in ability to get per subject details via StreamInfo. #2833

Merged
merged 2 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1148,5 +1148,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamInfoMaxSubjectsErr",
"code": 500,
"error_code": 10117,
"description": "subject details would exceed maximum allowed",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
]
58 changes: 50 additions & 8 deletions server/filestore.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -89,6 +89,7 @@ type fileStore struct {
aek cipher.AEAD
lmb *msgBlock
blks []*msgBlock
psmc map[string]uint64
hh hash.Hash64
qch chan struct{}
cfs []*consumerFileStore
Expand Down Expand Up @@ -280,6 +281,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
fs := &fileStore{
fcfg: fcfg,
cfg: FileStreamInfo{Created: created, StreamConfig: cfg},
psmc: make(map[string]uint64),
prf: prf,
qch: make(chan struct{}),
}
Expand Down Expand Up @@ -879,6 +881,13 @@ func (fs *fileStore) recoverMsgs() error {
}
fs.state.Msgs += mb.msgs
fs.state.Bytes += mb.bytes
// Walk the fss for this mb and fill in fs.psmc
for subj, ss := range mb.fss {
if len(subj) > 0 {
fs.psmc[subj] += ss.Msgs
}
}

} else {
return err
}
Expand Down Expand Up @@ -988,6 +997,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
mb.msgs--
purged++
// Update fss
fs.removePerSubject(sm.subj)
mb.removeSeqPerSubject(sm.subj, seq)
}

Expand Down Expand Up @@ -1528,6 +1538,11 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
return err
}

// Adjust top level tracking of per subject msg counts.
if len(subj) > 0 {
fs.psmc[subj]++
}

// Adjust first if needed.
now := time.Unix(0, ts).UTC()
if fs.state.Msgs == 0 {
Expand Down Expand Up @@ -1724,6 +1739,19 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) {
return fs.removeMsg(seq, true, true)
}

// Convenience function to remove per subject tracking at the filestore level.
// Lock should be held.
func (fs *fileStore) removePerSubject(subj string) {
if len(subj) == 0 {
return
}
if n, ok := fs.psmc[subj]; ok && n == 1 {
delete(fs.psmc, subj)
} else if ok {
fs.psmc[subj]--
}
}

// Remove a message, optionally rewriting the mb file.
func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error) {
fsLock := func() {
Expand Down Expand Up @@ -1811,6 +1839,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
mb.bytes -= msz

// If we are tracking multiple subjects here make sure we update that accounting.
fs.removePerSubject(sm.subj)
mb.removeSeqPerSubject(sm.subj, seq)

var shouldWriteIndex, firstSeqNeedsUpdate bool
Expand Down Expand Up @@ -3274,19 +3303,18 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
mb.llseq = seq
}

// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
if !hashChecked {
hh = mb.hh // This will force the hash check in msgFromBuf.
mb.cache.idx[seq-mb.cache.fseq] = (bi | hbit)
}

li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
return nil, errPartialCache
}
buf := mb.cache.buf[li:]

// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
if !hashChecked {
hh = mb.hh // This will force the hash check in msgFromBuf.
}

// Parse from the raw buffer.
subj, hdr, msg, mseq, ts, err := msgFromBuf(buf, hh)
if err != nil {
Expand All @@ -3297,6 +3325,11 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, mseq)
}

// Clear the check bit here after we know all is good.
if !hashChecked {
mb.cache.idx[seq-mb.cache.fseq] = (bi | hbit)
}

return &fileStoredMsg{subj, hdr, msg, seq, ts, mb, int64(bi)}, nil
}

Expand Down Expand Up @@ -3451,6 +3484,7 @@ func (fs *fileStore) FastState(state *StreamState) {
}
}
state.Consumers = len(fs.cfs)
state.NumSubjects = len(fs.psmc)
fs.mu.RUnlock()
}

Expand All @@ -3459,7 +3493,9 @@ func (fs *fileStore) State() StreamState {
fs.mu.RLock()
state := fs.state
state.Consumers = len(fs.cfs)
state.NumSubjects = len(fs.psmc)
state.Deleted = nil // make sure.

for _, mb := range fs.blks {
mb.mu.Lock()
fseq := mb.first.seq
Expand Down Expand Up @@ -3819,6 +3855,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
mb.msgs--
mb.bytes -= rl
// FSS updates.
fs.removePerSubject(sm.subj)
mb.removeSeqPerSubject(sm.subj, seq)
// Check for first message.
if seq == mb.first.seq {
Expand Down Expand Up @@ -3924,6 +3961,9 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {

fs.lmb.writeIndexInfo()

// Clear any per subject tracking.
fs.psmc = make(map[string]uint64)

cb := fs.scb
fs.mu.Unlock()

Expand Down Expand Up @@ -3988,6 +4028,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
smb.msgs--
purged++
// Update fss
fs.removePerSubject(sm.subj)
smb.removeSeqPerSubject(sm.subj, mseq)
}
}
Expand Down Expand Up @@ -4243,6 +4284,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
if mb.fss == nil {
mb.fss = make(map[string]*SimpleState)
}

fseq, lseq := mb.first.seq, mb.last.seq
for seq := fseq; seq <= lseq; seq++ {
if sm, _ := mb.cacheLookup(seq); sm != nil && len(sm.subj) > 0 {
Expand Down
26 changes: 24 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,12 @@ type JSApiStreamDeleteResponse struct {

const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"

// Maximum number of subject details we will send in the stream info.
const JSMaxSubjectDetails = 100_000

type JSApiStreamInfoRequest struct {
DeletedDetails bool `json:"deleted_details,omitempty"`
DeletedDetails bool `json:"deleted_details,omitempty"`
SubjectsFilter string `json:"subjects_filter,omitempty"`
}

type JSApiStreamInfoResponse struct {
Expand Down Expand Up @@ -1697,14 +1701,15 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
}

var details bool
var subjects string
if !isEmptyRequest(msg) {
var req JSApiStreamInfoRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
details = req.DeletedDetails
details, subjects = req.DeletedDetails, req.SubjectsFilter
}

mset, err := acc.lookupStream(streamName)
Expand All @@ -1730,6 +1735,23 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
resp.StreamInfo.Sources = mset.sourcesInfo()
}

// Check if they have asked for subject details.
if subjects != _EMPTY_ {
if mss := mset.store.SubjectsState(subjects); len(mss) > 0 {
if len(mss) > JSMaxSubjectDetails {
resp.StreamInfo = nil
resp.Error = NewJSStreamInfoMaxSubjectsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
sd := make(map[string]uint64, len(mss))
for subj, ss := range mss {
sd[subj] = ss.Msgs
}
resp.StreamInfo.State.Subjects = sd
}

}
// Check for out of band catchups.
if mset.hasCatchupPeers() {
mset.checkClusterInfo(resp.StreamInfo)
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ const (
// JSStreamHeaderExceedsMaximumErr header size exceeds maximum allowed of 64k
JSStreamHeaderExceedsMaximumErr ErrorIdentifier = 10097

// JSStreamInfoMaxSubjectsErr subject details would exceed maximum allowed
JSStreamInfoMaxSubjectsErr ErrorIdentifier = 10117

// JSStreamInvalidConfigF Stream configuration validation error string ({err})
JSStreamInvalidConfigF ErrorIdentifier = 10052

Expand Down Expand Up @@ -438,6 +441,7 @@ var (
JSStreamExternalDelPrefixOverlapsErrF: {Code: 400, ErrCode: 10022, Description: "stream external delivery prefix {prefix} overlaps with stream subject {subject}"},
JSStreamGeneralErrorF: {Code: 500, ErrCode: 10051, Description: "{err}"},
JSStreamHeaderExceedsMaximumErr: {Code: 400, ErrCode: 10097, Description: "header size exceeds maximum allowed of 64k"},
JSStreamInfoMaxSubjectsErr: {Code: 500, ErrCode: 10117, Description: "subject details would exceed maximum allowed"},
JSStreamInvalidConfigF: {Code: 500, ErrCode: 10052, Description: "{err}"},
JSStreamInvalidErr: {Code: 500, ErrCode: 10096, Description: "stream not valid"},
JSStreamInvalidExternalDeliverySubjErrF: {Code: 400, ErrCode: 10024, Description: "stream external delivery prefix {prefix} must not contain wildcards"},
Expand Down Expand Up @@ -1445,6 +1449,16 @@ func NewJSStreamHeaderExceedsMaximumError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSStreamHeaderExceedsMaximumErr]
}

// NewJSStreamInfoMaxSubjectsError creates a new JSStreamInfoMaxSubjectsErr error: "subject details would exceed maximum allowed"
func NewJSStreamInfoMaxSubjectsError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSStreamInfoMaxSubjectsErr]
}

// NewJSStreamInvalidConfigError creates a new JSStreamInvalidConfigF error: "{err}"
func NewJSStreamInvalidConfigError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down