Skip to content

Commit

Permalink
[FIXED] Stream config idempotency (#4292)
Browse files Browse the repository at this point in the history
- [X] Branch rebased on top of current main (`git pull --rebase origin
main`)
- [X] Changes squashed to a single commit (described
[here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html))
 - [x] Build is green in Travis CI
- [X] You have certified that the contribution is your original work and
that you license the work to the project under the [Apache 2
license](https://github.com/nats-io/nats-server/blob/main/LICENSE)

Fixes a behavior where idempotency of re-defining the same stream more
than once (with the same attributes) was broken due to the DeepEqual
failing due to the StreamSource struct received from the client app not
having a value for the `iname` structure field (as it's internal) but
the StreamSource struct return from `mset.config()` would have it set.

Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
  • Loading branch information
jnmoyne committed Jul 7, 2023
1 parent 20ce582 commit 69e137c
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions server/stream.go
Expand Up @@ -409,6 +409,12 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
jsa.mu.Unlock()
// Check to see if configs are same.
ocfg := mset.config()

// set the index name on cfg since it would not contain a value for iname while the return from mset.config() does to ensure the DeepEqual works
for _, s := range cfg.Sources {
s.setIndexName()
}

if reflect.DeepEqual(ocfg, cfg) {
if sa != nil {
mset.setStreamAssignment(sa)
Expand Down Expand Up @@ -449,20 +455,18 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}

// Setup our internal indexed names here for sources and check if the transform (if any) is valid.
if len(cfg.Sources) > 0 {
for _, ssi := range cfg.Sources {
ssi.setIndexName()
// check the filter, if any, is valid
if ssi.FilterSubject != _EMPTY_ && !IsValidSubject(ssi.FilterSubject) {

for _, ssi := range cfg.Sources {
// check the filter, if any, is valid
if ssi.FilterSubject != _EMPTY_ && !IsValidSubject(ssi.FilterSubject) {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
}
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
}
}
}
Expand Down

0 comments on commit 69e137c

Please sign in to comment.