diff --git a/kv.go b/kv.go index 0ec813de0..2c74848a1 100644 --- a/kv.go +++ b/kv.go @@ -225,6 +225,7 @@ type KeyValueConfig struct { Storage StorageType Replicas int Placement *Placement + RePublish *RePublish } // Used to watch all keys. @@ -401,6 +402,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { MaxMsgs: -1, MaxConsumers: -1, AllowDirect: true, + RePublish: cfg.RePublish, } // If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below. diff --git a/kv_test.go b/kv_test.go index dd17429aa..b1f3e164f 100644 --- a/kv_test.go +++ b/kv_test.go @@ -14,7 +14,9 @@ package nats import ( + "fmt" "testing" + "time" ) func TestKeyValueDiscardOldToDiscardNew(t *testing.T) { @@ -109,3 +111,60 @@ func TestKeyValueNonDirectGet(t *testing.T) { t.Fatalf("Expected error, got %+v", v) } } + +func TestKeyValueRePublish(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + if _, err := js.CreateKeyValue(&KeyValueConfig{ + Bucket: "TEST_UPDATE", + }); err != nil { + t.Fatalf("Error creating store: %v", err) + } + // This is expected to fail since server does not support as of now + // the update of RePublish. + if _, err := js.CreateKeyValue(&KeyValueConfig{ + Bucket: "TEST_UPDATE", + RePublish: &RePublish{Source: ">", Destination: "bar.>"}, + }); err == nil { + t.Fatal("Expected failure, did not get one") + } + + kv, err := js.CreateKeyValue(&KeyValueConfig{ + Bucket: "TEST", + RePublish: &RePublish{Source: ">", Destination: "bar.>"}, + }) + if err != nil { + t.Fatalf("Error creating store: %v", err) + } + si, err := js.StreamInfo("KV_TEST") + if err != nil { + t.Fatalf("Error getting stream info: %v", err) + } + if si.Config.RePublish == nil { + t.Fatal("Expected republish to be set, it was not") + } + + sub, err := nc.SubscribeSync("bar.>") + if err != nil { + t.Fatalf("Error on sub: %v", err) + } + if _, err := kv.Put("foo", []byte("value")); err != nil { + t.Fatalf("Error on put: %v", err) + } + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on next: %v", err) + } + if v := string(msg.Data); v != "value" { + t.Fatalf("Unexpected value: %s", v) + } + // The message should also have a header with the actual subject + expected := fmt.Sprintf(kvSubjectsPreTmpl, "TEST") + "foo" + if v := msg.Header.Get(JSSubject); v != expected { + t.Fatalf("Expected subject header %q, got %q", expected, v) + } +}