Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow editing RePublish on an existing stream #3811

Merged
merged 1 commit into from Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 69 additions & 36 deletions server/jetstream_cluster_2_test.go
Expand Up @@ -6361,24 +6361,21 @@ func TestJetStreamClusterEncryptedDoubleSnapshotBug(t *testing.T) {
require_NoError(t, err)
}

func TestJetStreamClusterRePublishUpdateNotSupported(t *testing.T) {
func TestJetStreamClusterRePublishUpdateSupported(t *testing.T) {
test := func(t *testing.T, s *Server, stream string, replicas int) {
nc := natsConnect(t, s.ClientURL())
nc, js := jsClientConnect(t, s)
defer nc.Close()

cfg := &StreamConfig{
cfg := &nats.StreamConfig{
Name: stream,
Storage: MemoryStorage,
Storage: nats.MemoryStorage,
Replicas: replicas,
Subjects: []string{"foo.>"},
}
addStream(t, nc, cfg)
_, err := js.AddStream(cfg)
require_NoError(t, err)

cfg.RePublish = &RePublish{
Source: ">",
Destination: "bar.>",
}
// We expect update to fail, do it manually:
expectFailUpdate := func() {
expectUpdate := func() {
t.Helper()

req, err := json.Marshal(cfg)
Expand All @@ -6391,41 +6388,77 @@ func TestJetStreamClusterRePublishUpdateNotSupported(t *testing.T) {
if resp.Type != JSApiStreamUpdateResponseType {
t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamUpdateResponseType)
}
if !IsNatsErr(resp.Error, JSStreamInvalidConfigF) {
t.Fatalf("Expected error regarding config error, got %+v", resp.Error)
if IsNatsErr(resp.Error, JSStreamInvalidConfigF) {
t.Fatalf("Expected no error regarding config error, got %+v", resp.Error)
}
}
expectFailUpdate()

// Now try with a new stream with RePublish present and then try to change config
cfg = &StreamConfig{
Name: stream + "_2",
Storage: MemoryStorage,
Replicas: replicas,
RePublish: &RePublish{
Source: ">",
Destination: "bar.>",
},
expectRepublished := func(expectedRepub bool) {
t.Helper()

nc, js := jsClientConnect(t, s)
defer nc.Close()

// Create a subscriber for foo.> so that we can see
// our published message being echoed back to us.
sf, err := nc.SubscribeSync("foo.>")
require_NoError(t, err)
defer sf.Unsubscribe()

// Create a subscriber for bar.> so that we can see
// any potentially republished messages.
sb, err := nc.SubscribeSync("bar.>")
require_NoError(t, err)
defer sf.Unsubscribe()

// Publish a message, it will hit the foo.> stream and
// may potentially be republished to the bar.> stream.
_, err = js.Publish("foo."+stream, []byte("HELLO!"))
require_NoError(t, err)

// Wait for a little while so that we have enough time
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
// to determine whether it's going to arrive on one or
// both streams.
checkSubsPending(t, sf, 1)
if expectedRepub {
checkSubsPending(t, sb, 1)
} else {
checkSubsPending(t, sb, 0)
}
}
addStream(t, nc, cfg)
cfg.RePublish.HeadersOnly = true
expectFailUpdate()

// One last test with existing first, then trying to remove
cfg.Name = stream + "_3"
addStream(t, nc, cfg)
// At this point there's no republish config, so we should
// only receive our published message on foo.>.
expectRepublished(false)

// Add a republish config so that everything on foo.> also
// gets republished to bar.>.
cfg.RePublish = &nats.RePublish{
Source: "foo.>",
Destination: "bar.>",
}
expectUpdate()
expectRepublished(true)
derekcollison marked this conversation as resolved.
Show resolved Hide resolved

// Now take the republish config away again, so we should go
// back to only getting them on foo.>.
cfg.RePublish = nil
expectFailUpdate()
expectUpdate()
expectRepublished(false)
}

s := RunBasicJetStreamServer(t)
defer s.Shutdown()
t.Run("Single", func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
test(t, s, "single", 1)
})
t.Run("Clustered", func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()

t.Run("Single", func(t *testing.T) { test(t, s, "single", 1) })
t.Run("Clustered", func(t *testing.T) { test(t, c.randomServer(), "clustered", 3) })
test(t, c.randomNonLeader(), "clustered", 3)
})
}

func TestJetStreamClusterDirectGetFromLeafnode(t *testing.T) {
Expand Down
21 changes: 17 additions & 4 deletions server/stream.go
Expand Up @@ -1348,10 +1348,6 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str
if !reflect.DeepEqual(cfg.Mirror, old.Mirror) {
return nil, NewJSStreamMirrorNotUpdatableError()
}
// Can't change RePublish
if !reflect.DeepEqual(cfg.RePublish, old.RePublish) {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change RePublish"))
}

// Check on new discard new per subject.
if cfg.DiscardNewPer {
Expand Down Expand Up @@ -1555,6 +1551,23 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
}
}

// Check for changes to RePublish.
if cfg.RePublish != nil {
// Empty same as all.
if cfg.RePublish.Source == _EMPTY_ {
cfg.RePublish.Source = fwcs
}
tr, err := newTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
jsa.mu.Unlock()
return fmt.Errorf("stream configuration for republish not valid")
}
// Assign our transform for republishing.
mset.tr = tr
} else {
mset.tr = nil
}

js := mset.js

if targetTier := tierName(cfg); mset.tier != targetTier {
Expand Down