Skip to content

Commit

Permalink
Adds sfs to sourceInfo
Browse files Browse the repository at this point in the history
Adds sfs to SourceInfo such that transforms with just a subject filter (and no transformation, meaning that the transform pointer in streamInfo is nil) can still be reflected in SourceInfo, which is important since the filtering is still happening, just no transformation as well.

Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
  • Loading branch information
jnmoyne committed Aug 16, 2023
1 parent 3f28de8 commit 93f6515
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions server/stream.go
Expand Up @@ -304,6 +304,7 @@ type sourceInfo struct {
wg sync.WaitGroup
sf string // subject filter
tr *subjectTransform
sfs []string // subject filters
trs []*subjectTransform // subject transforms
}

Expand Down Expand Up @@ -1739,8 +1740,10 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
} else {
si = &sourceInfo{name: s.Name, iname: s.iname}
si.trs = make([]*subjectTransform, len(s.SubjectTransforms))
si.sfs = make([]string, len(s.SubjectTransforms))
for i := range s.SubjectTransforms {
// err can be ignored as already validated in config check
si.sfs[i] = s.SubjectTransforms[i].Source
var err error
si.trs[i], err = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination)
if err != nil {
Expand Down Expand Up @@ -2013,10 +2016,12 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {
}

var trConfigs []SubjectTransformConfig
for _, tr := range si.trs {
if tr != nil {
trConfigs = append(trConfigs, SubjectTransformConfig{Source: tr.src, Destination: tr.dest})
for i := range si.sfs {
destination := _EMPTY_
if si.trs[i] != nil {
destination = si.trs[i].dest
}
trConfigs = append(trConfigs, SubjectTransformConfig{si.sfs[i], destination})
}
var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf, SubjectTransforms: trConfigs}

Expand Down Expand Up @@ -2450,17 +2455,20 @@ func (mset *stream) setupMirrorConsumer() error {
}
}

var filters []string
var sfs []string
var trs []*subjectTransform
for _, tr := range mset.cfg.Mirror.SubjectTransforms {
// will not fail as already checked before that the transform will work
subjectTransform, err := NewSubjectTransform(tr.Source, tr.Destination)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}
mirror.trs = append(mirror.trs, subjectTransform)
filters = append(filters, tr.Source)
sfs = append(sfs, tr.Source)
trs = append(trs, subjectTransform)
}
req.Config.FilterSubjects = filters
mirror.sfs = sfs
mirror.trs = trs
req.Config.FilterSubjects = sfs

respCh := make(chan *JSApiConsumerCreateResponse, 1)
reply := infoReplySubject()
Expand Down Expand Up @@ -3265,15 +3273,17 @@ func (mset *stream) startingSequenceForSources() {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
} else {
var sfs []string
var trs []*subjectTransform
for _, str := range ssi.SubjectTransforms {
tr, err := NewSubjectTransform(str.Source, str.Destination)
if err != nil {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
sfs = append(sfs, str.Source)
trs = append(trs, tr)
}
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, trs: trs}
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sfs: sfs, trs: trs}
}
mset.sources[ssi.iname] = si
}
Expand Down

0 comments on commit 93f6515

Please sign in to comment.