Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] StreamInfo reflecting subject transforms with just a filter and no transformation for Sources #4403

Merged
merged 1 commit into from Aug 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 32 additions & 16 deletions server/stream.go
Expand Up @@ -304,6 +304,7 @@ type sourceInfo struct {
wg sync.WaitGroup
sf string // subject filter
tr *subjectTransform
sfs []string // subject filters
trs []*subjectTransform // subject transforms
}

Expand Down Expand Up @@ -1739,8 +1740,10 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
} else {
si = &sourceInfo{name: s.Name, iname: s.iname}
si.trs = make([]*subjectTransform, len(s.SubjectTransforms))
si.sfs = make([]string, len(s.SubjectTransforms))
for i := range s.SubjectTransforms {
// err can be ignored as already validated in config check
si.sfs[i] = s.SubjectTransforms[i].Source
var err error
si.trs[i], err = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination)
if err != nil {
Expand Down Expand Up @@ -2012,18 +2015,23 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {
return nil
}

var trConfigs []SubjectTransformConfig
for _, tr := range si.trs {
if tr != nil {
trConfigs = append(trConfigs, SubjectTransformConfig{Source: tr.src, Destination: tr.dest})
}
}
var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf, SubjectTransforms: trConfigs}
var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf}

if si.tr != nil {
ssi.SubjectTransformDest = si.tr.dest
}

trConfigs := make([]SubjectTransformConfig, len(si.sfs))
for i := range si.sfs {
destination := _EMPTY_
if si.trs[i] != nil {
destination = si.trs[i].dest
}
trConfigs[i] = SubjectTransformConfig{si.sfs[i], destination}
}

ssi.SubjectTransforms = trConfigs

// If we have not heard from the source, set Active to -1.
if si.last.IsZero() {
ssi.Active = -1
Expand Down Expand Up @@ -2442,6 +2450,7 @@ func (mset *stream) setupMirrorConsumer() error {
// Filters
if mset.cfg.Mirror.FilterSubject != _EMPTY_ {
req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject
mirror.sf = mset.cfg.Mirror.FilterSubject
// Set transform if any
var err error
mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest)
Expand All @@ -2450,17 +2459,22 @@ func (mset *stream) setupMirrorConsumer() error {
}
}

var filters []string
for _, tr := range mset.cfg.Mirror.SubjectTransforms {
sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms))
trs := make([]*subjectTransform, len(mset.cfg.Mirror.SubjectTransforms))

for i, tr := range mset.cfg.Mirror.SubjectTransforms {
// will not fail as already checked before that the transform will work
subjectTransform, err := NewSubjectTransform(tr.Source, tr.Destination)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}
mirror.trs = append(mirror.trs, subjectTransform)
filters = append(filters, tr.Source)

sfs[i] = tr.Source
trs[i] = subjectTransform
}
req.Config.FilterSubjects = filters
mirror.sfs = sfs
mirror.trs = trs
req.Config.FilterSubjects = sfs

respCh := make(chan *JSApiConsumerCreateResponse, 1)
reply := infoReplySubject()
Expand Down Expand Up @@ -3265,15 +3279,17 @@ func (mset *stream) startingSequenceForSources() {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
} else {
var trs []*subjectTransform
for _, str := range ssi.SubjectTransforms {
sfs := make([]string, len(ssi.SubjectTransforms))
trs := make([]*subjectTransform, len(ssi.SubjectTransforms))
for i, str := range ssi.SubjectTransforms {
tr, err := NewSubjectTransform(str.Source, str.Destination)
if err != nil {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
trs = append(trs, tr)
sfs[i] = str.Source
trs[i] = tr
}
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, trs: trs}
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sfs: sfs, trs: trs}
}
mset.sources[ssi.iname] = si
}
Expand Down