Skip to content

Commit

Permalink
Apply review changes
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Feb 12, 2023
1 parent 9320bae commit 75a2476
Showing 1 changed file with 6 additions and 9 deletions.
15 changes: 6 additions & 9 deletions server/stream.go
Expand Up @@ -4625,7 +4625,7 @@ func (mset *stream) numConsumers() int {
func (mset *stream) setConsumer(o *consumer) {
mset.consumers[o.name] = o
if o.subjf != nil {
mset.numFilter += len(o.subjf)
mset.numFilter++
}
if o.cfg.Direct {
mset.directs++
Expand Down Expand Up @@ -4727,12 +4727,11 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) {
mset.mu.Lock()
defer mset.mu.Unlock()

mset.numFilter += len(newFilters)

for range o.subjf {
if mset.numFilter > 0 {
mset.numFilter--
}
if mset.numFilter > 0 && len(o.subjf) > 0 {
mset.numFilter--
}
if len(newFilters) > 0 {
mset.numFilter++
}
}

Expand Down Expand Up @@ -4791,11 +4790,9 @@ func (mset *stream) Store() StreamStore {
func (mset *stream) partitionUnique(partitions []string) bool {
for _, partition := range partitions {
for _, o := range mset.consumers {

if o.subjf == nil {
return false
}

for _, filter := range o.subjf {
if subjectIsSubsetMatch(partition, filter.subject) ||
subjectIsSubsetMatch(filter.subject, partition) {
Expand Down

0 comments on commit 75a2476

Please sign in to comment.