Skip to content

Commit

Permalink
Merge pull request #1031 from nats-io/kv_republish
Browse files Browse the repository at this point in the history
[ADDED] KeyValue: RePublish option in KeyValueConfiguration
  • Loading branch information
kozlovic committed Aug 3, 2022
2 parents ec49000 + 92ea51f commit cc189da
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
2 changes: 2 additions & 0 deletions kv.go
Expand Up @@ -225,6 +225,7 @@ type KeyValueConfig struct {
Storage StorageType
Replicas int
Placement *Placement
RePublish *RePublish
}

// Used to watch all keys.
Expand Down Expand Up @@ -401,6 +402,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
MaxMsgs: -1,
MaxConsumers: -1,
AllowDirect: true,
RePublish: cfg.RePublish,
}

// If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below.
Expand Down
59 changes: 59 additions & 0 deletions kv_test.go
Expand Up @@ -14,7 +14,9 @@
package nats

import (
"fmt"
"testing"
"time"
)

func TestKeyValueDiscardOldToDiscardNew(t *testing.T) {
Expand Down Expand Up @@ -109,3 +111,60 @@ func TestKeyValueNonDirectGet(t *testing.T) {
t.Fatalf("Expected error, got %+v", v)
}
}

func TestKeyValueRePublish(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

if _, err := js.CreateKeyValue(&KeyValueConfig{
Bucket: "TEST_UPDATE",
}); err != nil {
t.Fatalf("Error creating store: %v", err)
}
// This is expected to fail since server does not support as of now
// the update of RePublish.
if _, err := js.CreateKeyValue(&KeyValueConfig{
Bucket: "TEST_UPDATE",
RePublish: &RePublish{Source: ">", Destination: "bar.>"},
}); err == nil {
t.Fatal("Expected failure, did not get one")
}

kv, err := js.CreateKeyValue(&KeyValueConfig{
Bucket: "TEST",
RePublish: &RePublish{Source: ">", Destination: "bar.>"},
})
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
si, err := js.StreamInfo("KV_TEST")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}
if si.Config.RePublish == nil {
t.Fatal("Expected republish to be set, it was not")
}

sub, err := nc.SubscribeSync("bar.>")
if err != nil {
t.Fatalf("Error on sub: %v", err)
}
if _, err := kv.Put("foo", []byte("value")); err != nil {
t.Fatalf("Error on put: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on next: %v", err)
}
if v := string(msg.Data); v != "value" {
t.Fatalf("Unexpected value: %s", v)
}
// The message should also have a header with the actual subject
expected := fmt.Sprintf(kvSubjectsPreTmpl, "TEST") + "foo"
if v := msg.Header.Get(JSSubject); v != expected {
t.Fatalf("Expected subject header %q, got %q", expected, v)
}
}

0 comments on commit cc189da

Please sign in to comment.