Skip to content

Commit

Permalink
Apply review changes
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Feb 13, 2023
1 parent 9320bae commit 33eb6a9
Showing 1 changed file with 7 additions and 10 deletions.
17 changes: 7 additions & 10 deletions server/stream.go
Expand Up @@ -213,7 +213,7 @@ type stream struct {
lseq uint64
lmsgId string
consumers map[string]*consumer
numFilter int
numFilter int // number of filtered consumers
cfg StreamConfig
created time.Time
stype StorageType
Expand Down Expand Up @@ -4625,7 +4625,7 @@ func (mset *stream) numConsumers() int {
func (mset *stream) setConsumer(o *consumer) {
mset.consumers[o.name] = o
if o.subjf != nil {
mset.numFilter += len(o.subjf)
mset.numFilter++
}
if o.cfg.Direct {
mset.directs++
Expand Down Expand Up @@ -4727,12 +4727,11 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) {
mset.mu.Lock()
defer mset.mu.Unlock()

mset.numFilter += len(newFilters)

for range o.subjf {
if mset.numFilter > 0 {
mset.numFilter--
}
if mset.numFilter > 0 && len(o.subjf) > 0 {
mset.numFilter--
}
if len(newFilters) > 0 {
mset.numFilter++
}
}

Expand Down Expand Up @@ -4791,11 +4790,9 @@ func (mset *stream) Store() StreamStore {
func (mset *stream) partitionUnique(partitions []string) bool {
for _, partition := range partitions {
for _, o := range mset.consumers {

if o.subjf == nil {
return false
}

for _, filter := range o.subjf {
if subjectIsSubsetMatch(partition, filter.subject) ||
subjectIsSubsetMatch(filter.subject, partition) {
Expand Down

0 comments on commit 33eb6a9

Please sign in to comment.