Skip to content

Commit

Permalink
Resolve conflicts from dev
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Feb 6, 2023
1 parent adae217 commit 123ec50
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 21 deletions.
1 change: 1 addition & 0 deletions server/consumer.go
Expand Up @@ -1676,6 +1676,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
newSubjf = append(newSubjf, fs)

}
o.subjf = newSubjf
// Make sure we have correct signaling setup.
// Consumer lock can not be held.
mset := o.mset
Expand Down
47 changes: 26 additions & 21 deletions server/stream.go
Expand Up @@ -1743,16 +1743,20 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err

mset.clsMu.RLock()
for _, o := range mset.cList {
o.mu.RLock()
// we update consumer sequences if:
// no subject was specified, we can purge all consumers sequences
if preq == nil ||
doPurge := preq == nil ||
preq.Subject == _EMPTY_ ||
// or consumer filter subject is equal to purged subject
preq.Subject == o.cfg.FilterSubject ||
// or consumer subject is subset of purged subject,
// but not the other way around.
subjectIsSubsetMatch(o.cfg.FilterSubject, preq.Subject) {
subjectIsSubsetMatch(o.cfg.FilterSubject, preq.Subject)
o.mu.RUnlock()
if doPurge {
o.purge(fseq, lseq)

}
}
mset.clsMu.RUnlock()
Expand Down Expand Up @@ -4690,39 +4694,40 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) {
mset.clsMu.Lock()
o.mu.Lock()

if o.sigSub != nil {
if o.sigSubs != nil {
if mset.csl != nil {
for _, sub := range o.sigSubs {
mset.csl.Remove(sub)
}
for _, sub := range o.sigSubs {
mset.csl.Remove(sub)
}
}
o.sigSub = nil
o.sigSubs = nil
}

if o.isLeader() {
if mset.csl == nil {
mset.csl = NewSublistWithCache()
}
// If no filters are preset, add fwcs to sublist for that consumer.
if mset.csl == nil {
mset.csl = NewSublistWithCache()
}
// If no filters are preset, add fwcs to sublist for that consumer.
if newFilters == nil {
sub := &subscription{subject: []byte(fwcs), icb: o.processStreamSignal}
o.mset.csl.Insert(sub)
// If there are filters, add their subjects to sublist.
o.mset.csl.Insert(sub)
o.sigSubs = append(o.sigSubs, sub)
// If there are filters, add their subjects to sublist.
} else {
for _, filter := range newFilters {
sub := &subscription{subject: []byte(filter), icb: o.processStreamSignal}
o.mset.csl.Insert(sub)
}
for _, filter := range newFilters {
sub := &subscription{subject: []byte(filter), icb: o.processStreamSignal}
o.mset.csl.Insert(sub)
o.sigSubs = append(o.sigSubs, sub)
}
}
}
o.sigSubs = sub
o.mu.Unlock()
mset.clsMu.Unlock()

mset.mu.Loc()
defer mset.mu.Unlock()
mset.mu.Lock()
defer mset.mu.Unlock()

mset.numFilter += len(newFilters)
mset.numFilter += len(newFilters)

for range o.subjf {
if mset.numFilter > 0 {
Expand Down

0 comments on commit 123ec50

Please sign in to comment.