From ff4ed925812876c732d036d02ab89b565e5893c1 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 3 Aug 2022 14:44:27 -0600 Subject: [PATCH 1/2] [ADDED] KeyValue: RePublish option in KeyValueConfiguration Ability to create a KV with the re-publish feature. Signed-off-by: Ivan Kozlovic --- kv.go | 2 ++ kv_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/kv.go b/kv.go index 0ec813de0..2c74848a1 100644 --- a/kv.go +++ b/kv.go @@ -225,6 +225,7 @@ type KeyValueConfig struct { Storage StorageType Replicas int Placement *Placement + RePublish *RePublish } // Used to watch all keys. @@ -401,6 +402,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { MaxMsgs: -1, MaxConsumers: -1, AllowDirect: true, + RePublish: cfg.RePublish, } // If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below. diff --git a/kv_test.go b/kv_test.go index dd17429aa..4a1bd512b 100644 --- a/kv_test.go +++ b/kv_test.go @@ -14,7 +14,9 @@ package nats import ( + "fmt" "testing" + "time" ) func TestKeyValueDiscardOldToDiscardNew(t *testing.T) { @@ -109,3 +111,46 @@ func TestKeyValueNonDirectGet(t *testing.T) { t.Fatalf("Expected error, got %+v", v) } } + +func TestKeyValueRePublish(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&KeyValueConfig{ + Bucket: "TEST", + RePublish: &RePublish{Source: ">", Destination: "bar.>"}, + }) + if err != nil { + t.Fatalf("Error creating store: %v", err) + } + si, err := js.StreamInfo("KV_TEST") + if err != nil { + t.Fatalf("Error getting stream info: %v", err) + } + if si.Config.RePublish == nil { + t.Fatal("Expected republish to be set, it was not") + } + + sub, err := nc.SubscribeSync("bar.>") + if err != nil { + t.Fatalf("Error on sub: %v", err) + } + if _, err := kv.Put("foo", []byte("value")); err != nil { + t.Fatalf("Error on put: %v", err) + } + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on next: %v", err) + } + if v := string(msg.Data); v != "value" { + t.Fatalf("Unexpected value: %s", v) + } + // The message should also have a header with the actual subject + expected := fmt.Sprintf(kvSubjectsPreTmpl, "TEST") + "foo" + if v := msg.Header.Get(JSSubject); v != expected { + t.Fatalf("Expected subject header %q, got %q", expected, v) + } +} From 92ea51f2322a9e4568cd2c9521edf283f05bb6ec Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 3 Aug 2022 15:53:42 -0600 Subject: [PATCH 2/2] CreateKeyValue with RePublish will fail if KV already exists This cannot be updated in the server at the moment, so if user calls CreateKeyValue() with a config that does not have RePublish and then call again on the same bucket with RePublish set, the user will get an error. Added a test that shows that this is the case. Signed-off-by: Ivan Kozlovic --- kv_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/kv_test.go b/kv_test.go index 4a1bd512b..b1f3e164f 100644 --- a/kv_test.go +++ b/kv_test.go @@ -119,6 +119,20 @@ func TestKeyValueRePublish(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() + if _, err := js.CreateKeyValue(&KeyValueConfig{ + Bucket: "TEST_UPDATE", + }); err != nil { + t.Fatalf("Error creating store: %v", err) + } + // This is expected to fail since server does not support as of now + // the update of RePublish. + if _, err := js.CreateKeyValue(&KeyValueConfig{ + Bucket: "TEST_UPDATE", + RePublish: &RePublish{Source: ">", Destination: "bar.>"}, + }); err == nil { + t.Fatal("Expected failure, did not get one") + } + kv, err := js.CreateKeyValue(&KeyValueConfig{ Bucket: "TEST", RePublish: &RePublish{Source: ">", Destination: "bar.>"},