Skip to content

Commit

Permalink
Allow editing of RePublish on an existing stream
Browse files Browse the repository at this point in the history
  • Loading branch information
neilalexander committed Jan 25, 2023
1 parent a3365ab commit 4f45df6
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 40 deletions.
105 changes: 69 additions & 36 deletions server/jetstream_cluster_2_test.go
Expand Up @@ -6361,24 +6361,21 @@ func TestJetStreamClusterEncryptedDoubleSnapshotBug(t *testing.T) {
require_NoError(t, err)
}

func TestJetStreamClusterRePublishUpdateNotSupported(t *testing.T) {
func TestJetStreamClusterRePublishUpdateSupported(t *testing.T) {
test := func(t *testing.T, s *Server, stream string, replicas int) {
nc := natsConnect(t, s.ClientURL())
nc, js := jsClientConnect(t, s)
defer nc.Close()

cfg := &StreamConfig{
cfg := &nats.StreamConfig{
Name: stream,
Storage: MemoryStorage,
Storage: nats.MemoryStorage,
Replicas: replicas,
Subjects: []string{"foo.>"},
}
addStream(t, nc, cfg)
_, err := js.AddStream(cfg)
require_NoError(t, err)

cfg.RePublish = &RePublish{
Source: ">",
Destination: "bar.>",
}
// We expect update to fail, do it manually:
expectFailUpdate := func() {
expectUpdate := func() {
t.Helper()

req, err := json.Marshal(cfg)
Expand All @@ -6391,41 +6388,77 @@ func TestJetStreamClusterRePublishUpdateNotSupported(t *testing.T) {
if resp.Type != JSApiStreamUpdateResponseType {
t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamUpdateResponseType)
}
if !IsNatsErr(resp.Error, JSStreamInvalidConfigF) {
t.Fatalf("Expected error regarding config error, got %+v", resp.Error)
if IsNatsErr(resp.Error, JSStreamInvalidConfigF) {
t.Fatalf("Expected no error regarding config error, got %+v", resp.Error)
}
}
expectFailUpdate()

// Now try with a new stream with RePublish present and then try to change config
cfg = &StreamConfig{
Name: stream + "_2",
Storage: MemoryStorage,
Replicas: replicas,
RePublish: &RePublish{
Source: ">",
Destination: "bar.>",
},
expectRepublished := func(expectedRepub bool) {
t.Helper()

nc, js := jsClientConnect(t, s)
defer nc.Close()

// Create a subscriber for foo.> so that we can see
// our published message being echoed back to us.
sf, err := nc.SubscribeSync("foo.>")
require_NoError(t, err)
defer sf.Unsubscribe()

// Create a subscriber for bar.> so that we can see
// any potentially republished messages.
sb, err := nc.SubscribeSync("bar.>")
require_NoError(t, err)
defer sf.Unsubscribe()

// Publish a message, it will hit the foo.> stream and
// may potentially be republished to the bar.> stream.
_, err = js.Publish("foo."+stream, []byte("HELLO!"))
require_NoError(t, err)

// Wait for a little while so that we have enough time
// to determine whether it's going to arrive on one or
// both streams.
checkSubsPending(t, sf, 1)
if expectedRepub {
checkSubsPending(t, sb, 1)
} else {
checkSubsPending(t, sb, 0)
}
}
addStream(t, nc, cfg)
cfg.RePublish.HeadersOnly = true
expectFailUpdate()

// One last test with existing first, then trying to remove
cfg.Name = stream + "_3"
addStream(t, nc, cfg)
// At this point there's no republish config, so we should
// only receive our published message on foo.>.
expectRepublished(false)

// Add a republish config so that everything on foo.> also
// gets republished to bar.>.
cfg.RePublish = &nats.RePublish{
Source: "foo.>",
Destination: "bar.>",
}
expectUpdate()
expectRepublished(true)

// Now take the republish config away again, so we should go
// back to only getting them on foo.>.
cfg.RePublish = nil
expectFailUpdate()
expectUpdate()
expectRepublished(false)
}

s := RunBasicJetStreamServer(t)
defer s.Shutdown()
t.Run("Single", func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
test(t, s, "single", 1)
})
t.Run("Clustered", func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()

t.Run("Single", func(t *testing.T) { test(t, s, "single", 1) })
t.Run("Clustered", func(t *testing.T) { test(t, c.randomServer(), "clustered", 3) })
test(t, c.randomNonLeader(), "clustered", 3)
})
}

func TestJetStreamClusterDirectGetFromLeafnode(t *testing.T) {
Expand Down
21 changes: 17 additions & 4 deletions server/stream.go
Expand Up @@ -1348,10 +1348,6 @@ func (jsa *jsAccount) configUpdateCheck(old, new *StreamConfig, s *Server) (*Str
if !reflect.DeepEqual(cfg.Mirror, old.Mirror) {
return nil, NewJSStreamMirrorNotUpdatableError()
}
// Can't change RePublish
if !reflect.DeepEqual(cfg.RePublish, old.RePublish) {
return nil, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration update can not change RePublish"))
}

// Check on new discard new per subject.
if cfg.DiscardNewPer {
Expand Down Expand Up @@ -1555,6 +1551,23 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
}
}

// Check for changes to RePublish.
if cfg.RePublish != nil {
// Empty same as all.
if cfg.RePublish.Source == _EMPTY_ {
cfg.RePublish.Source = fwcs
}
tr, err := newTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
jsa.mu.Unlock()
return fmt.Errorf("stream configuration for republish not valid")
}
// Assign our transform for republishing.
mset.tr = tr
} else {
mset.tr = nil
}

js := mset.js

if targetTier := tierName(cfg); mset.tier != targetTier {
Expand Down

0 comments on commit 4f45df6

Please sign in to comment.