New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream subject transform improvements #3827
Merged
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -428,11 +428,18 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt | |
if len(cfg.Sources) > 0 { | ||
for i, ssi := range cfg.Sources { | ||
ssi.setIndexName(i) | ||
// check the filter, if any, is valid | ||
if ssi.FilterSubject != _EMPTY_ { | ||
if !IsValidSubject(ssi.FilterSubject) { | ||
jsa.mu.Unlock() | ||
return nil, fmt.Errorf("subject filter '%s' for the source %w", ssi.FilterSubject, ErrBadSubject) | ||
} | ||
} | ||
// check the transform, if any, is valid | ||
if ssi.SubjectTransformDest != _EMPTY_ { | ||
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil { | ||
jsa.mu.Unlock() | ||
return nil, fmt.Errorf("subject transform for the source not valid %w", err) | ||
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source %w", ssi.FilterSubject, ssi.SubjectTransformDest, err) | ||
} | ||
} | ||
} | ||
|
@@ -486,7 +493,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt | |
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination) | ||
if err != nil { | ||
jsa.mu.Unlock() | ||
return nil, fmt.Errorf("stream input subject transform not valid %w", err) | ||
return nil, fmt.Errorf("stream subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err) | ||
} | ||
mset.itr = tr | ||
} | ||
|
@@ -496,7 +503,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt | |
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination) | ||
if err != nil { | ||
jsa.mu.Unlock() | ||
return nil, fmt.Errorf("stream republish transform not valid %w", err) | ||
return nil, fmt.Errorf("stream republish transform from '%s' to '%s' %w", cfg.RePublish.Source, cfg.RePublish.Destination, err) | ||
} | ||
// Assign our transform for republishing. | ||
mset.tr = tr | ||
|
@@ -1305,7 +1312,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi | |
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle")) | ||
} | ||
if _, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination); err != nil { | ||
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish not valid")) | ||
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish with transform from '%s' to '%s' not valid", cfg.RePublish.Source, cfg.RePublish.Destination)) | ||
} | ||
} | ||
|
||
|
@@ -1520,13 +1527,15 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) | |
|
||
// Check for Sources. | ||
if len(cfg.Sources) > 0 || len(ocfg.Sources) > 0 { | ||
current := make(map[string]string) | ||
currentFilter := make(map[string]string) | ||
currentTransformDest := make(map[string]string) | ||
for _, s := range ocfg.Sources { | ||
current[s.iname] = s.FilterSubject | ||
currentFilter[s.iname] = s.FilterSubject | ||
currentTransformDest[s.iname] = s.SubjectTransformDest | ||
} | ||
for i, s := range cfg.Sources { | ||
s.setIndexName(i) | ||
if oFilter, ok := current[s.iname]; !ok { | ||
if oFilter, ok := currentFilter[s.iname]; !ok { | ||
if mset.sources == nil { | ||
mset.sources = make(map[string]*sourceInfo) | ||
} | ||
|
@@ -1536,7 +1545,8 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) | |
if s.SubjectTransformDest != _EMPTY_ { | ||
var err error | ||
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil { | ||
return err | ||
mset.mu.Unlock() | ||
return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err) | ||
} | ||
} | ||
|
||
|
@@ -1545,6 +1555,13 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) | |
mset.setSourceConsumer(s.iname, si.sseq+1, time.Time{}) | ||
} else if oFilter != s.FilterSubject { | ||
if si, ok := mset.sources[s.iname]; ok { | ||
if s.SubjectTransformDest != _EMPTY_ { | ||
var err error | ||
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil { | ||
mset.mu.Unlock() | ||
return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err) | ||
} | ||
} | ||
filterOverlap := true | ||
if oFilter != _EMPTY_ && s.FilterSubject != _EMPTY_ { | ||
newFilter := strings.Split(s.FilterSubject, tsep) | ||
|
@@ -1564,11 +1581,26 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) | |
mset.setSourceConsumer(s.iname, si.sseq+1, time.Now()) | ||
} | ||
} | ||
} else if currentTransformDest[s.iname] != s.SubjectTransformDest { | ||
// transform destination has changed | ||
if si, ok := mset.sources[s.iname]; ok { | ||
if s.SubjectTransformDest == _EMPTY_ { | ||
// remove the transform | ||
si.tr = nil | ||
} else { | ||
// update the transform with the new destination if it's valid | ||
var err error | ||
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil { | ||
mset.mu.Unlock() | ||
return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err) | ||
} | ||
} | ||
} | ||
} | ||
delete(current, s.iname) | ||
delete(currentFilter, s.iname) | ||
} | ||
// What is left in current needs to be deleted. | ||
for iname := range current { | ||
// What is left in currentFilter needs to be deleted. | ||
for iname := range currentFilter { | ||
mset.cancelSourceConsumer(iname) | ||
delete(mset.sources, iname) | ||
} | ||
|
@@ -1592,17 +1624,39 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) | |
if cfg.RePublish.Source == _EMPTY_ { | ||
cfg.RePublish.Source = fwcs | ||
} | ||
if cfg.RePublish.Destination == _EMPTY_ { | ||
cfg.RePublish.Destination = fwcs | ||
} | ||
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination) | ||
if err != nil { | ||
jsa.mu.Unlock() | ||
return fmt.Errorf("stream configuration for republish not valid") | ||
mset.mu.Unlock() | ||
return fmt.Errorf("stream configuration for republish from '%s' to '%s' %w", cfg.RePublish.Source, cfg.RePublish.Destination, err) | ||
} | ||
// Assign our transform for republishing. | ||
mset.tr = tr | ||
} else { | ||
mset.tr = nil | ||
} | ||
|
||
// Check for changes to subject transform | ||
if ocfg.SubjectTransform == nil && cfg.SubjectTransform != nil { | ||
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination) | ||
if err != nil { | ||
mset.mu.Unlock() | ||
return fmt.Errorf("stream configuration for subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err) | ||
} | ||
mset.itr = tr | ||
} else if ocfg.SubjectTransform != nil && cfg.SubjectTransform != nil && (ocfg.SubjectTransform.Source != cfg.SubjectTransform.Source || ocfg.SubjectTransform.Destination != cfg.SubjectTransform.Destination) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make multi-line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination) | ||
if err != nil { | ||
mset.mu.Unlock() | ||
return fmt.Errorf("stream configuration for subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err) | ||
} | ||
mset.itr = tr | ||
} else if ocfg.SubjectTransform != nil && cfg.SubjectTransform == nil { | ||
mset.itr = nil | ||
} | ||
|
||
js := mset.js | ||
|
||
if targetTier := tierName(cfg); mset.tier != targetTier { | ||
|
@@ -2547,7 +2601,10 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T | |
if si := mset.sources[iname]; si != nil && si.sub != nil { | ||
si.err = nil | ||
if ccr.Error != nil || ccr.ConsumerInfo == nil { | ||
mset.srv.Warnf("JetStream error response for create source consumer: %+v", ccr.Error) | ||
// Note: this warning can happen a few times when starting up the server when sourcing streams are | ||
// defined, this is normal as the streams are re-created in no particular order and it is possible | ||
// that a stream sourcing another could come up before all of its sources have been recreated. | ||
mset.srv.Warnf("JetStream error response for stream %s create source consumer %s: %+v", mset.cfg.Name, si.name, ccr.Error) | ||
si.err = ccr.Error | ||
// Let's retry as soon as possible, but we are gated by sourceConsumerRetryThreshold | ||
retry = true | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can put on one line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done