From 9c698d2d84bf4fb775cb97b7d45f7c6cdd4b7453 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 17 Mar 2022 09:54:49 -0600 Subject: [PATCH] Fixed KV's setting of duplicates window Signed-off-by: Ivan Kozlovic --- kv.go | 11 ++++++++++- test/kv_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/kv.go b/kv.go index 24a771d96..aff84e487 100644 --- a/kv.go +++ b/kv.go @@ -338,6 +338,15 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { if maxMsgSize == 0 { maxMsgSize = -1 } + // When stream's MaxAge is not set, server uses 2 minutes as the default + // for the duplicate window. If MaxAge is set, and lower than 2 minutes, + // then the duplicate window will be set to that. If MaxAge is greater, + // we will cap the duplicate window to 2 minutes (to be consistent with + // previous behavior). + duplicateWindow := 2 * time.Minute + if cfg.TTL > 0 && cfg.TTL < duplicateWindow { + duplicateWindow = cfg.TTL + } scfg := &StreamConfig{ Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket), Description: cfg.Description, @@ -351,7 +360,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { Placement: cfg.Placement, AllowRollup: true, DenyDelete: true, - Duplicates: 2 * time.Minute, + Duplicates: duplicateWindow, MaxMsgs: -1, MaxConsumers: -1, } diff --git a/test/kv_test.go b/test/kv_test.go index 7568f43bb..e6dd2c6f3 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -736,6 +736,34 @@ func TestKeyValueCrossAccounts(t *testing.T) { } } +func TestKeyValueDuplicatesWindow(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + checkWindow := func(ttl, expectedDuplicates time.Duration) { + t.Helper() + + _, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 5, TTL: ttl}) + expectOk(t, err) + defer js.DeleteKeyValue("TEST") + + si, err := js.StreamInfo("KV_TEST") + if err != nil { + t.Fatalf("StreamInfo error: %v", err) + } + if si.Config.Duplicates != expectedDuplicates { + t.Fatalf("Expected duplicates to be %v, got %v", expectedDuplicates, si.Config.Duplicates) + } + } + + checkWindow(0, 2*time.Minute) + checkWindow(time.Hour, 2*time.Minute) + checkWindow(5*time.Second, 5*time.Second) +} + // Helpers func client(t *testing.T, s *server.Server, opts ...nats.Option) *nats.Conn {