Skip to content

Commit

Permalink
[IMPROVED] StreamInfo reflecting subject transforms with just a filte…
Browse files Browse the repository at this point in the history
…r and no transformation for Sources (#4403)

- [X] Branch rebased on top of current main (`git pull --rebase origin
main`)
- [X] Changes squashed to a single commit (described
[here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html))
 - [x] Build is green in Travis CI
- [X] You have certified that the contribution is your original work and
that you license the work to the project under the [Apache 2
license](https://github.com/nats-io/nats-server/blob/main/LICENSE)

Adds sfs to sourceInfo such that transforms with just a subject filter
(and no transformation, meaning that the transform pointer in streamInfo
is nil) can still be reflected in SourceInfo, which is important since
the filtering is still happening, just no transformation as well.
  • Loading branch information
neilalexander committed Aug 21, 2023
2 parents 3f28de8 + 62f62d4 commit 4886f1f
Showing 1 changed file with 32 additions and 16 deletions.
48 changes: 32 additions & 16 deletions server/stream.go
Expand Up @@ -304,6 +304,7 @@ type sourceInfo struct {
wg sync.WaitGroup
sf string // subject filter
tr *subjectTransform
sfs []string // subject filters
trs []*subjectTransform // subject transforms
}

Expand Down Expand Up @@ -1739,8 +1740,10 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
} else {
si = &sourceInfo{name: s.Name, iname: s.iname}
si.trs = make([]*subjectTransform, len(s.SubjectTransforms))
si.sfs = make([]string, len(s.SubjectTransforms))
for i := range s.SubjectTransforms {
// err can be ignored as already validated in config check
si.sfs[i] = s.SubjectTransforms[i].Source
var err error
si.trs[i], err = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination)
if err != nil {
Expand Down Expand Up @@ -2012,18 +2015,23 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {
return nil
}

var trConfigs []SubjectTransformConfig
for _, tr := range si.trs {
if tr != nil {
trConfigs = append(trConfigs, SubjectTransformConfig{Source: tr.src, Destination: tr.dest})
}
}
var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf, SubjectTransforms: trConfigs}
var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf}

if si.tr != nil {
ssi.SubjectTransformDest = si.tr.dest
}

trConfigs := make([]SubjectTransformConfig, len(si.sfs))
for i := range si.sfs {
destination := _EMPTY_
if si.trs[i] != nil {
destination = si.trs[i].dest
}
trConfigs[i] = SubjectTransformConfig{si.sfs[i], destination}
}

ssi.SubjectTransforms = trConfigs

// If we have not heard from the source, set Active to -1.
if si.last.IsZero() {
ssi.Active = -1
Expand Down Expand Up @@ -2442,6 +2450,7 @@ func (mset *stream) setupMirrorConsumer() error {
// Filters
if mset.cfg.Mirror.FilterSubject != _EMPTY_ {
req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject
mirror.sf = mset.cfg.Mirror.FilterSubject
// Set transform if any
var err error
mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest)
Expand All @@ -2450,17 +2459,22 @@ func (mset *stream) setupMirrorConsumer() error {
}
}

var filters []string
for _, tr := range mset.cfg.Mirror.SubjectTransforms {
sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms))
trs := make([]*subjectTransform, len(mset.cfg.Mirror.SubjectTransforms))

for i, tr := range mset.cfg.Mirror.SubjectTransforms {
// will not fail as already checked before that the transform will work
subjectTransform, err := NewSubjectTransform(tr.Source, tr.Destination)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}
mirror.trs = append(mirror.trs, subjectTransform)
filters = append(filters, tr.Source)

sfs[i] = tr.Source
trs[i] = subjectTransform
}
req.Config.FilterSubjects = filters
mirror.sfs = sfs
mirror.trs = trs
req.Config.FilterSubjects = sfs

respCh := make(chan *JSApiConsumerCreateResponse, 1)
reply := infoReplySubject()
Expand Down Expand Up @@ -3265,15 +3279,17 @@ func (mset *stream) startingSequenceForSources() {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
} else {
var trs []*subjectTransform
for _, str := range ssi.SubjectTransforms {
sfs := make([]string, len(ssi.SubjectTransforms))
trs := make([]*subjectTransform, len(ssi.SubjectTransforms))
for i, str := range ssi.SubjectTransforms {
tr, err := NewSubjectTransform(str.Source, str.Destination)
if err != nil {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
trs = append(trs, tr)
sfs[i] = str.Source
trs[i] = tr
}
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, trs: trs}
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sfs: sfs, trs: trs}
}
mset.sources[ssi.iname] = si
}
Expand Down

0 comments on commit 4886f1f

Please sign in to comment.