Skip to content

Commit

Permalink
Merge pull request #931 from nats-io/kv_duplicates_regression
Browse files Browse the repository at this point in the history
Fixed KV's setting of duplicates window
  • Loading branch information
kozlovic committed Mar 17, 2022
2 parents f52a247 + 9c698d2 commit c92df80
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
11 changes: 10 additions & 1 deletion kv.go
Expand Up @@ -338,6 +338,15 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
if maxMsgSize == 0 {
maxMsgSize = -1
}
// When stream's MaxAge is not set, server uses 2 minutes as the default
// for the duplicate window. If MaxAge is set, and lower than 2 minutes,
// then the duplicate window will be set to that. If MaxAge is greater,
// we will cap the duplicate window to 2 minutes (to be consistent with
// previous behavior).
duplicateWindow := 2 * time.Minute
if cfg.TTL > 0 && cfg.TTL < duplicateWindow {
duplicateWindow = cfg.TTL
}
scfg := &StreamConfig{
Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
Description: cfg.Description,
Expand All @@ -351,7 +360,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
Placement: cfg.Placement,
AllowRollup: true,
DenyDelete: true,
Duplicates: 2 * time.Minute,
Duplicates: duplicateWindow,
MaxMsgs: -1,
MaxConsumers: -1,
}
Expand Down
28 changes: 28 additions & 0 deletions test/kv_test.go
Expand Up @@ -736,6 +736,34 @@ func TestKeyValueCrossAccounts(t *testing.T) {
}
}

func TestKeyValueDuplicatesWindow(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

checkWindow := func(ttl, expectedDuplicates time.Duration) {
t.Helper()

_, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 5, TTL: ttl})
expectOk(t, err)
defer js.DeleteKeyValue("TEST")

si, err := js.StreamInfo("KV_TEST")
if err != nil {
t.Fatalf("StreamInfo error: %v", err)
}
if si.Config.Duplicates != expectedDuplicates {
t.Fatalf("Expected duplicates to be %v, got %v", expectedDuplicates, si.Config.Duplicates)
}
}

checkWindow(0, 2*time.Minute)
checkWindow(time.Hour, 2*time.Minute)
checkWindow(5*time.Second, 5*time.Second)
}

// Helpers

func client(t *testing.T, s *server.Server, opts ...nats.Option) *nats.Conn {
Expand Down

0 comments on commit c92df80

Please sign in to comment.