Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Stream config idempotency #4292

Merged
merged 1 commit into from Jul 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 17 additions & 13 deletions server/stream.go
Expand Up @@ -409,6 +409,12 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
jsa.mu.Unlock()
// Check to see if configs are same.
ocfg := mset.config()

// set the index name on cfg since it would not contain a value for iname while the return from mset.config() does to ensure the DeepEqual works
for _, s := range cfg.Sources {
s.setIndexName()
}

if reflect.DeepEqual(ocfg, cfg) {
if sa != nil {
mset.setStreamAssignment(sa)
Expand Down Expand Up @@ -449,20 +455,18 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}

// Setup our internal indexed names here for sources and check if the transform (if any) is valid.
if len(cfg.Sources) > 0 {
for _, ssi := range cfg.Sources {
ssi.setIndexName()
// check the filter, if any, is valid
if ssi.FilterSubject != _EMPTY_ && !IsValidSubject(ssi.FilterSubject) {

for _, ssi := range cfg.Sources {
// check the filter, if any, is valid
if ssi.FilterSubject != _EMPTY_ && !IsValidSubject(ssi.FilterSubject) {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
}
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
}
}
}
Expand Down