Skip to content

Commit

Permalink
Merge pull request #3827 from nats-io/jnm/streamsubjectransformimprove1
Browse files Browse the repository at this point in the history
Stream subject transform improvements
  • Loading branch information
derekcollison committed Jan 29, 2023
2 parents 11ffd69 + 3518c94 commit a505c88
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 16 deletions.
4 changes: 2 additions & 2 deletions server/accounts.go
Expand Up @@ -1830,7 +1830,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
// Create a transform. Do so in reverse such that $ symbols only exist in to
if tr, err = NewSubjectTransform(to, transformTokenize(from)); err != nil {
a.mu.Unlock()
return nil, fmt.Errorf("failed to create mapping transform for service import subject %q to %q: %v",
return nil, fmt.Errorf("failed to create mapping transform for service import subject from %q to %q: %v",
from, to, err)
} else {
// un-tokenize and reverse transform so we get the transform needed
Expand Down Expand Up @@ -2376,7 +2376,7 @@ func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to stri
} else {
// Create a transform
if tr, err = NewSubjectTransform(from, transformTokenize(to)); err != nil {
return fmt.Errorf("failed to create mapping transform for stream import subject %q to %q: %v",
return fmt.Errorf("failed to create mapping transform for stream import subject from %q to %q: %v",
from, to, err)
}
to, _ = transformUntokenize(to)
Expand Down
84 changes: 70 additions & 14 deletions server/stream.go
Expand Up @@ -428,11 +428,16 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
if len(cfg.Sources) > 0 {
for i, ssi := range cfg.Sources {
ssi.setIndexName(i)
// check the filter, if any, is valid
if ssi.FilterSubject != _EMPTY_ && !IsValidSubject(ssi.FilterSubject) {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform for the source not valid %w", err)
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
}
}
}
Expand Down Expand Up @@ -486,7 +491,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream input subject transform not valid %w", err)
return nil, fmt.Errorf("stream subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
}
mset.itr = tr
}
Expand All @@ -496,7 +501,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream republish transform not valid %w", err)
return nil, fmt.Errorf("stream republish transform from '%s' to '%s' %w", cfg.RePublish.Source, cfg.RePublish.Destination, err)
}
// Assign our transform for republishing.
mset.tr = tr
Expand Down Expand Up @@ -1305,7 +1310,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle"))
}
if _, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination); err != nil {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish not valid"))
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish with transform from '%s' to '%s' not valid", cfg.RePublish.Source, cfg.RePublish.Destination))
}
}

Expand Down Expand Up @@ -1520,13 +1525,15 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)

// Check for Sources.
if len(cfg.Sources) > 0 || len(ocfg.Sources) > 0 {
current := make(map[string]string)
currentFilter := make(map[string]string)
currentTransformDest := make(map[string]string)
for _, s := range ocfg.Sources {
current[s.iname] = s.FilterSubject
currentFilter[s.iname] = s.FilterSubject
currentTransformDest[s.iname] = s.SubjectTransformDest
}
for i, s := range cfg.Sources {
s.setIndexName(i)
if oFilter, ok := current[s.iname]; !ok {
if oFilter, ok := currentFilter[s.iname]; !ok {
if mset.sources == nil {
mset.sources = make(map[string]*sourceInfo)
}
Expand All @@ -1536,7 +1543,8 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
if s.SubjectTransformDest != _EMPTY_ {
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
return err
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err)
}
}

Expand All @@ -1545,6 +1553,13 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
mset.setSourceConsumer(s.iname, si.sseq+1, time.Time{})
} else if oFilter != s.FilterSubject {
if si, ok := mset.sources[s.iname]; ok {
if s.SubjectTransformDest != _EMPTY_ {
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err)
}
}
filterOverlap := true
if oFilter != _EMPTY_ && s.FilterSubject != _EMPTY_ {
newFilter := strings.Split(s.FilterSubject, tsep)
Expand All @@ -1564,11 +1579,26 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
mset.setSourceConsumer(s.iname, si.sseq+1, time.Now())
}
}
} else if currentTransformDest[s.iname] != s.SubjectTransformDest {
// transform destination has changed
if si, ok := mset.sources[s.iname]; ok {
if s.SubjectTransformDest == _EMPTY_ {
// remove the transform
si.tr = nil
} else {
// update the transform with the new destination if it's valid
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err)
}
}
}
}
delete(current, s.iname)
delete(currentFilter, s.iname)
}
// What is left in current needs to be deleted.
for iname := range current {
// What is left in currentFilter needs to be deleted.
for iname := range currentFilter {
mset.cancelSourceConsumer(iname)
delete(mset.sources, iname)
}
Expand All @@ -1592,17 +1622,40 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
if cfg.RePublish.Source == _EMPTY_ {
cfg.RePublish.Source = fwcs
}
if cfg.RePublish.Destination == _EMPTY_ {
cfg.RePublish.Destination = fwcs
}
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
jsa.mu.Unlock()
return fmt.Errorf("stream configuration for republish not valid")
mset.mu.Unlock()
return fmt.Errorf("stream configuration for republish from '%s' to '%s' %w", cfg.RePublish.Source, cfg.RePublish.Destination, err)
}
// Assign our transform for republishing.
mset.tr = tr
} else {
mset.tr = nil
}

// Check for changes to subject transform
if ocfg.SubjectTransform == nil && cfg.SubjectTransform != nil {
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream configuration for subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
}
mset.itr = tr
} else if ocfg.SubjectTransform != nil && cfg.SubjectTransform != nil &&
(ocfg.SubjectTransform.Source != cfg.SubjectTransform.Source || ocfg.SubjectTransform.Destination != cfg.SubjectTransform.Destination) {
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream configuration for subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
}
mset.itr = tr
} else if ocfg.SubjectTransform != nil && cfg.SubjectTransform == nil {
mset.itr = nil
}

js := mset.js

if targetTier := tierName(cfg); mset.tier != targetTier {
Expand Down Expand Up @@ -2547,7 +2600,10 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
if si := mset.sources[iname]; si != nil && si.sub != nil {
si.err = nil
if ccr.Error != nil || ccr.ConsumerInfo == nil {
mset.srv.Warnf("JetStream error response for create source consumer: %+v", ccr.Error)
// Note: this warning can happen a few times when starting up the server when sourcing streams are
// defined, this is normal as the streams are re-created in no particular order and it is possible
// that a stream sourcing another could come up before all of its sources have been recreated.
mset.srv.Warnf("JetStream error response for stream %s create source consumer %s: %+v", mset.cfg.Name, si.name, ccr.Error)
si.err = ccr.Error
// Let's retry as soon as possible, but we are gated by sourceConsumerRetryThreshold
retry = true
Expand Down

0 comments on commit a505c88

Please sign in to comment.