Skip to content

Commit

Permalink
Fix stream sourcing & mirroring overlap errors
Browse files Browse the repository at this point in the history
When adding or updating sources/mirrors, server was checking if the stream with
a given name exists to check for subject overlaps, among other things.
However, if sourced/mirrored stream was `External`, checks should
not be executed, as not only stream would never be found,
but also, if `External` stream had the same name as the sourcing stream,
the check would be wrongly performed against itself.

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Apr 14, 2023
1 parent 89fc7e3 commit a66c67b
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 66 deletions.
24 changes: 0 additions & 24 deletions server/jetstream_cluster_1_test.go
Expand Up @@ -5819,18 +5819,6 @@ func TestJetStreamClusterFailMirrorsAndSources(t *testing.T) {
})
}

testPrefix("mirror-bad-deliverprefix", JSStreamExternalDelPrefixOverlapsErrF, StreamConfig{
Name: "MY_MIRROR_TEST",
Storage: FileStorage,
Mirror: &StreamSource{
Name: "TEST",
External: &ExternalStream{
ApiPrefix: "RI.JS.API",
// this will result in test.test.> which test.> would match
DeliverPrefix: "test",
},
},
})
testPrefix("mirror-bad-apiprefix", JSStreamExternalApiOverlapErrF, StreamConfig{
Name: "MY_MIRROR_TEST",
Storage: FileStorage,
Expand All @@ -5842,18 +5830,6 @@ func TestJetStreamClusterFailMirrorsAndSources(t *testing.T) {
},
},
})
testPrefix("source-bad-deliverprefix", JSStreamExternalDelPrefixOverlapsErrF, StreamConfig{
Name: "MY_SOURCE_TEST",
Storage: FileStorage,
Sources: []*StreamSource{{
Name: "TEST",
External: &ExternalStream{
ApiPrefix: "RI.JS.API",
DeliverPrefix: "test",
},
},
},
})
testPrefix("source-bad-apiprefix", JSStreamExternalApiOverlapErrF, StreamConfig{
Name: "MY_SOURCE_TEST",
Storage: FileStorage,
Expand Down
97 changes: 97 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -19815,3 +19815,100 @@ func TestJetStreamConsumerWithFormattingSymbol(t *testing.T) {
_, err = sub.NextMsg(time.Second * 5)
require_NoError(t, err)
}

func TestJetStreamStreamUpdateWithExternalSource(t *testing.T) {
ho := DefaultTestOptions
ho.Port = -1
ho.LeafNode.Host = "127.0.0.1"
ho.LeafNode.Port = -1
ho.JetStream = true
ho.JetStreamDomain = "hub"
ho.StoreDir = t.TempDir()
hs := RunServer(&ho)
defer hs.Shutdown()

lu, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ho.LeafNode.Port))
require_NoError(t, err)

lo1 := DefaultTestOptions
lo1.Port = -1
lo1.ServerName = "a-leaf"
lo1.JetStream = true
lo1.StoreDir = t.TempDir()
lo1.JetStreamDomain = "a-leaf"
lo1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{lu}}}
l1 := RunServer(&lo1)
defer l1.Shutdown()

checkLeafNodeConnected(t, l1)

// Test sources with `External` provided
ncl, jsl := jsClientConnect(t, l1)
defer ncl.Close()

// Hub stream.
_, err = jsl.AddStream(&nats.StreamConfig{Name: "stream", Subjects: []string{"leaf"}})
require_NoError(t, err)

nch, jsh := jsClientConnect(t, hs)
defer nch.Close()

// Leaf stream.
// Both streams uses the same name, as we're testing if overlap does not check against itself
// if `External` stream has the same name.
_, err = jsh.AddStream(&nats.StreamConfig{
Name: "stream",
Subjects: []string{"hub"},
})
require_NoError(t, err)

// Add `Sources`.
// This should not validate subjects overlap against itself.
_, err = jsh.UpdateStream(&nats.StreamConfig{
Name: "stream",
Subjects: []string{"hub"},
Sources: []*nats.StreamSource{
{
Name: "stream",
FilterSubject: "leaf",
External: &nats.ExternalStream{
APIPrefix: "$JS.a-leaf.API",
},
},
},
})
require_NoError(t, err)

// Specifying not existing FilterSubject should also be fine, as we do not validate `External` stream.
_, err = jsh.UpdateStream(&nats.StreamConfig{
Name: "stream",
Subjects: []string{"hub"},
Sources: []*nats.StreamSource{
{
Name: "stream",
FilterSubject: "foo",
External: &nats.ExternalStream{
APIPrefix: "$JS.a-leaf.API",
},
},
},
})
require_NoError(t, err)

// Add one more stream to the Hub, so when we source it, it is not `External`.
_, err = jsh.AddStream(&nats.StreamConfig{Name: "other", Subjects: []string{"other"}})
require_NoError(t, err)

_, err = jsh.UpdateStream(&nats.StreamConfig{
Name: "stream",
Subjects: []string{"hub"},
Sources: []*nats.StreamSource{
{
Name: "other",
FilterSubject: "foo",
},
},
})
require_Error(t, err)
require_True(t, strings.Contains(err.Error(), "does not overlap"))
}
90 changes: 48 additions & 42 deletions server/stream.go
Expand Up @@ -1098,62 +1098,68 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if len(cfg.Sources) > 0 {
return StreamConfig{}, NewJSMirrorWithSourcesError()
}
// We do not require other stream to exist anymore, but if we can see it check payloads.
exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name)
if len(subs) > 0 {
streamSubs = append(streamSubs, subs...)
}
if exists {
if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize {
return StreamConfig{}, NewJSMirrorMaxMessageSizeTooBigError()
// Do not perform checks if External is provided, as it could lead to
// checking against itself (if sourced stream name is the same on different JetStream)
if cfg.Mirror.External == nil {
// We do not require other stream to exist anymore, but if we can see it check payloads.
exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name)
if len(subs) > 0 {
streamSubs = append(streamSubs, subs...)
}
if !isRecovering && !hasFilterSubjectOverlap(cfg.Mirror.FilterSubject, subs) {
return StreamConfig{}, NewJSStreamInvalidConfigError(
fmt.Errorf("mirror '%s' filter subject '%s' does not overlap with any origin stream subject",
cfg.Mirror.Name, cfg.Mirror.FilterSubject))
if exists {
if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize {
return StreamConfig{}, NewJSMirrorMaxMessageSizeTooBigError()
}
if !isRecovering && !hasFilterSubjectOverlap(cfg.Mirror.FilterSubject, subs) {
return StreamConfig{}, NewJSStreamInvalidConfigError(
fmt.Errorf("mirror '%s' filter subject '%s' does not overlap with any origin stream subject",
cfg.Mirror.Name, cfg.Mirror.FilterSubject))
}
}
}
if cfg.Mirror.External != nil {
// Determine if we are inheriting direct gets.
if exists, ocfg := getStream(cfg.Mirror.Name); exists {
cfg.MirrorDirect = ocfg.AllowDirect
} else if js := s.getJetStream(); js != nil && js.isClustered() {
// Could not find it here. If we are clustered we can look it up.
js.mu.RLock()
if cc := js.cluster; cc != nil {
if as := cc.streams[acc.Name]; as != nil {
if sa := as[cfg.Mirror.Name]; sa != nil {
cfg.MirrorDirect = sa.Config.AllowDirect
}
}
}
js.mu.RUnlock()
}
} else {
if cfg.Mirror.External.DeliverPrefix != _EMPTY_ {
deliveryPrefixes = append(deliveryPrefixes, cfg.Mirror.External.DeliverPrefix)
}
if cfg.Mirror.External.ApiPrefix != _EMPTY_ {
apiPrefixes = append(apiPrefixes, cfg.Mirror.External.ApiPrefix)
}
}
// Determine if we are inheriting direct gets.
if exists, ocfg := getStream(cfg.Mirror.Name); exists {
cfg.MirrorDirect = ocfg.AllowDirect
} else if js := s.getJetStream(); js != nil && js.isClustered() {
// Could not find it here. If we are clustered we can look it up.
js.mu.RLock()
if cc := js.cluster; cc != nil {
if as := cc.streams[acc.Name]; as != nil {
if sa := as[cfg.Mirror.Name]; sa != nil {
cfg.MirrorDirect = sa.Config.AllowDirect
}
}
}
js.mu.RUnlock()

}
}
if len(cfg.Sources) > 0 {
for _, src := range cfg.Sources {
exists, maxMsgSize, subs := hasStream(src.Name)
if len(subs) > 0 {
streamSubs = append(streamSubs, subs...)
}
if exists {
if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize {
return StreamConfig{}, NewJSSourceMaxMessageSizeTooBigError()
// Do not perform checks if External is provided, as it could lead to
// checking against itself (if sourced stream name is the same on different JetStream)
if src.External == nil {
exists, maxMsgSize, subs := hasStream(src.Name)
if len(subs) > 0 {
streamSubs = append(streamSubs, subs...)
}
if !isRecovering && !hasFilterSubjectOverlap(src.FilterSubject, streamSubs) {
return StreamConfig{}, NewJSStreamInvalidConfigError(
fmt.Errorf("source '%s' filter subject '%s' does not overlap with any origin stream subject",
src.Name, src.FilterSubject))
if exists {
if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize {
return StreamConfig{}, NewJSSourceMaxMessageSizeTooBigError()
}
if !isRecovering && !hasFilterSubjectOverlap(src.FilterSubject, streamSubs) {
return StreamConfig{}, NewJSStreamInvalidConfigError(
fmt.Errorf("source '%s' filter subject '%s' does not overlap with any origin stream subject",
src.Name, src.FilterSubject))
}
}
}
if src.External == nil {
continue
}
if src.External.DeliverPrefix != _EMPTY_ {
Expand Down

0 comments on commit a66c67b

Please sign in to comment.