Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stream sourcing & mirroring overlap errors #4052

Merged
merged 1 commit into from Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 0 additions & 24 deletions server/jetstream_cluster_1_test.go
Expand Up @@ -5819,18 +5819,6 @@ func TestJetStreamClusterFailMirrorsAndSources(t *testing.T) {
})
}

testPrefix("mirror-bad-deliverprefix", JSStreamExternalDelPrefixOverlapsErrF, StreamConfig{
Name: "MY_MIRROR_TEST",
Storage: FileStorage,
Mirror: &StreamSource{
Name: "TEST",
External: &ExternalStream{
ApiPrefix: "RI.JS.API",
// this will result in test.test.> which test.> would match
DeliverPrefix: "test",
},
},
})
testPrefix("mirror-bad-apiprefix", JSStreamExternalApiOverlapErrF, StreamConfig{
Name: "MY_MIRROR_TEST",
Storage: FileStorage,
Expand All @@ -5842,18 +5830,6 @@ func TestJetStreamClusterFailMirrorsAndSources(t *testing.T) {
},
},
})
testPrefix("source-bad-deliverprefix", JSStreamExternalDelPrefixOverlapsErrF, StreamConfig{
Name: "MY_SOURCE_TEST",
Storage: FileStorage,
Sources: []*StreamSource{{
Name: "TEST",
External: &ExternalStream{
ApiPrefix: "RI.JS.API",
DeliverPrefix: "test",
},
},
},
})
testPrefix("source-bad-apiprefix", JSStreamExternalApiOverlapErrF, StreamConfig{
Name: "MY_SOURCE_TEST",
Storage: FileStorage,
Expand Down
97 changes: 97 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -19815,3 +19815,100 @@ func TestJetStreamConsumerWithFormattingSymbol(t *testing.T) {
_, err = sub.NextMsg(time.Second * 5)
require_NoError(t, err)
}

func TestJetStreamStreamUpdateWithExternalSource(t *testing.T) {
ho := DefaultTestOptions
ho.Port = -1
ho.LeafNode.Host = "127.0.0.1"
ho.LeafNode.Port = -1
ho.JetStream = true
ho.JetStreamDomain = "hub"
ho.StoreDir = t.TempDir()
hs := RunServer(&ho)
defer hs.Shutdown()

lu, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ho.LeafNode.Port))
require_NoError(t, err)

lo1 := DefaultTestOptions
lo1.Port = -1
lo1.ServerName = "a-leaf"
lo1.JetStream = true
lo1.StoreDir = t.TempDir()
lo1.JetStreamDomain = "a-leaf"
lo1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{lu}}}
l1 := RunServer(&lo1)
defer l1.Shutdown()

checkLeafNodeConnected(t, l1)

// Test sources with `External` provided
ncl, jsl := jsClientConnect(t, l1)
defer ncl.Close()

// Hub stream.
_, err = jsl.AddStream(&nats.StreamConfig{Name: "stream", Subjects: []string{"leaf"}})
require_NoError(t, err)

nch, jsh := jsClientConnect(t, hs)
defer nch.Close()

// Leaf stream.
// Both streams uses the same name, as we're testing if overlap does not check against itself
// if `External` stream has the same name.
_, err = jsh.AddStream(&nats.StreamConfig{
Name: "stream",
Subjects: []string{"hub"},
})
require_NoError(t, err)

// Add `Sources`.
// This should not validate subjects overlap against itself.
_, err = jsh.UpdateStream(&nats.StreamConfig{
Name: "stream",
Subjects: []string{"hub"},
Sources: []*nats.StreamSource{
{
Name: "stream",
FilterSubject: "leaf",
External: &nats.ExternalStream{
APIPrefix: "$JS.a-leaf.API",
},
},
},
})
require_NoError(t, err)

// Specifying not existing FilterSubject should also be fine, as we do not validate `External` stream.
_, err = jsh.UpdateStream(&nats.StreamConfig{
Name: "stream",
Subjects: []string{"hub"},
Sources: []*nats.StreamSource{
{
Name: "stream",
FilterSubject: "foo",
External: &nats.ExternalStream{
APIPrefix: "$JS.a-leaf.API",
},
},
},
})
require_NoError(t, err)

// Add one more stream to the Hub, so when we source it, it is not `External`.
_, err = jsh.AddStream(&nats.StreamConfig{Name: "other", Subjects: []string{"other"}})
require_NoError(t, err)

_, err = jsh.UpdateStream(&nats.StreamConfig{
Name: "stream",
Subjects: []string{"hub"},
Sources: []*nats.StreamSource{
{
Name: "other",
FilterSubject: "foo",
},
},
})
require_Error(t, err)
require_True(t, strings.Contains(err.Error(), "does not overlap"))
}
90 changes: 48 additions & 42 deletions server/stream.go
Expand Up @@ -1098,62 +1098,68 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if len(cfg.Sources) > 0 {
return StreamConfig{}, NewJSMirrorWithSourcesError()
}
// We do not require other stream to exist anymore, but if we can see it check payloads.
exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name)
if len(subs) > 0 {
streamSubs = append(streamSubs, subs...)
}
if exists {
if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize {
return StreamConfig{}, NewJSMirrorMaxMessageSizeTooBigError()
// Do not perform checks if External is provided, as it could lead to
// checking against itself (if sourced stream name is the same on different JetStream)
if cfg.Mirror.External == nil {
// We do not require other stream to exist anymore, but if we can see it check payloads.
exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name)
if len(subs) > 0 {
streamSubs = append(streamSubs, subs...)
}
if !isRecovering && !hasFilterSubjectOverlap(cfg.Mirror.FilterSubject, subs) {
return StreamConfig{}, NewJSStreamInvalidConfigError(
fmt.Errorf("mirror '%s' filter subject '%s' does not overlap with any origin stream subject",
cfg.Mirror.Name, cfg.Mirror.FilterSubject))
if exists {
if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize {
return StreamConfig{}, NewJSMirrorMaxMessageSizeTooBigError()
}
if !isRecovering && !hasFilterSubjectOverlap(cfg.Mirror.FilterSubject, subs) {
return StreamConfig{}, NewJSStreamInvalidConfigError(
fmt.Errorf("mirror '%s' filter subject '%s' does not overlap with any origin stream subject",
cfg.Mirror.Name, cfg.Mirror.FilterSubject))
}
}
}
if cfg.Mirror.External != nil {
// Determine if we are inheriting direct gets.
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
if exists, ocfg := getStream(cfg.Mirror.Name); exists {
cfg.MirrorDirect = ocfg.AllowDirect
} else if js := s.getJetStream(); js != nil && js.isClustered() {
// Could not find it here. If we are clustered we can look it up.
js.mu.RLock()
if cc := js.cluster; cc != nil {
if as := cc.streams[acc.Name]; as != nil {
if sa := as[cfg.Mirror.Name]; sa != nil {
cfg.MirrorDirect = sa.Config.AllowDirect
}
}
}
js.mu.RUnlock()
}
} else {
if cfg.Mirror.External.DeliverPrefix != _EMPTY_ {
deliveryPrefixes = append(deliveryPrefixes, cfg.Mirror.External.DeliverPrefix)
}
if cfg.Mirror.External.ApiPrefix != _EMPTY_ {
apiPrefixes = append(apiPrefixes, cfg.Mirror.External.ApiPrefix)
}
}
// Determine if we are inheriting direct gets.
if exists, ocfg := getStream(cfg.Mirror.Name); exists {
cfg.MirrorDirect = ocfg.AllowDirect
} else if js := s.getJetStream(); js != nil && js.isClustered() {
// Could not find it here. If we are clustered we can look it up.
js.mu.RLock()
if cc := js.cluster; cc != nil {
if as := cc.streams[acc.Name]; as != nil {
if sa := as[cfg.Mirror.Name]; sa != nil {
cfg.MirrorDirect = sa.Config.AllowDirect
}
}
}
js.mu.RUnlock()

}
}
if len(cfg.Sources) > 0 {
for _, src := range cfg.Sources {
exists, maxMsgSize, subs := hasStream(src.Name)
if len(subs) > 0 {
streamSubs = append(streamSubs, subs...)
}
if exists {
if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize {
return StreamConfig{}, NewJSSourceMaxMessageSizeTooBigError()
// Do not perform checks if External is provided, as it could lead to
// checking against itself (if sourced stream name is the same on different JetStream)
if src.External == nil {
exists, maxMsgSize, subs := hasStream(src.Name)
if len(subs) > 0 {
streamSubs = append(streamSubs, subs...)
}
if !isRecovering && !hasFilterSubjectOverlap(src.FilterSubject, streamSubs) {
return StreamConfig{}, NewJSStreamInvalidConfigError(
fmt.Errorf("source '%s' filter subject '%s' does not overlap with any origin stream subject",
src.Name, src.FilterSubject))
if exists {
if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize {
return StreamConfig{}, NewJSSourceMaxMessageSizeTooBigError()
}
if !isRecovering && !hasFilterSubjectOverlap(src.FilterSubject, streamSubs) {
return StreamConfig{}, NewJSStreamInvalidConfigError(
fmt.Errorf("source '%s' filter subject '%s' does not overlap with any origin stream subject",
src.Name, src.FilterSubject))
}
}
}
if src.External == nil {
continue
}
if src.External.DeliverPrefix != _EMPTY_ {
Expand Down