Skip to content

Commit

Permalink
Merge pull request #917 from nats-io/kv_create_updgrade_issue
Browse files Browse the repository at this point in the history
KV: changing discard policy would fail on srv upgrade
  • Loading branch information
kozlovic committed Mar 15, 2022
2 parents e483e46 + 5bdaa1f commit 77460b8
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 46 deletions.
3 changes: 3 additions & 0 deletions jsm.go
Expand Up @@ -587,6 +587,9 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == 10058 {
return nil, ErrStreamNameAlreadyInUse
}
return nil, errors.New(resp.Error.Description)
}

Expand Down
37 changes: 34 additions & 3 deletions kv.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -326,18 +327,31 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
replicas = 1
}

// We will set explicitly some values so that we can do comparison
// if we get an "already in use" error and need to check if it is same.
maxBytes := cfg.MaxBytes
if maxBytes == 0 {
maxBytes = -1
}
maxMsgSize := cfg.MaxValueSize
if maxMsgSize == 0 {
maxMsgSize = -1
}
scfg := &StreamConfig{
Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
Description: cfg.Description,
Subjects: []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)},
MaxMsgsPerSubject: history,
MaxBytes: cfg.MaxBytes,
MaxBytes: maxBytes,
MaxAge: cfg.TTL,
MaxMsgSize: cfg.MaxValueSize,
MaxMsgSize: maxMsgSize,
Storage: cfg.Storage,
Replicas: replicas,
AllowRollup: true,
DenyDelete: true,
Duplicates: 2 * time.Minute,

This comment has been minimized.

Copy link
@ripienaar

ripienaar Mar 17, 2022

Contributor

@kozlovic this should be the same as cfg.TTL when cfg.TTL is less than 2 minutes, else you get a failure when making TTL less than 2 minutes of duplicates window can not be larger then max age, so essentially this introduces a regression where KV TTL is minimum 2 minutes.

This comment has been minimized.

Copy link
@kozlovic

kozlovic Mar 17, 2022

Author Member

Ok, so checking server code, server sets it to 2 minutes if no MaxAge is set or if MaxAge > 2min. So in the past, since we were not setting it, it would be set to whatever was set as MaxAge (assuming lower than 2 min).

The way I see it:

  • Do the same type of check/setting than in the server.
  • Revert the addition of Duplicates/MaxMsgs/MaxConsumers settings and instead of doing reflect.DeepEqual, do a comparison only on the fields that we set for a KV stream (as I do in the C client).

Any preference?

This comment has been minimized.

Copy link
@ripienaar

ripienaar Mar 17, 2022

Contributor

Doing the same checks seem reasonable. I am a bit concerned about the deep equal in general as the server just is not good enough at not fiddling with things users give it in all cases. Though I think that's more on the side of consumers now so probably ok.

MaxMsgs: -1,
MaxConsumers: -1,
}

// If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below.
Expand All @@ -346,7 +360,24 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
}

if _, err := js.AddStream(scfg); err != nil {
return nil, err
// If we have a failure to add, it could be because we have
// a config change if the KV was created against a pre 2.7.2
// and we are now moving to a v2.7.2+. If that is the case
// and the only difference is the discard policy, then update
// the stream.
if err == ErrStreamNameAlreadyInUse {
if si, _ := js.StreamInfo(scfg.Name); si != nil {
// To compare, make the server's stream info discard
// policy same than ours.
si.Config.Discard = scfg.Discard
if reflect.DeepEqual(&si.Config, scfg) {
_, err = js.UpdateStream(scfg)
}
}
}
if err != nil {
return nil, err
}
}

kv := &kvs{
Expand Down
69 changes: 69 additions & 0 deletions kv_test.go
@@ -0,0 +1,69 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
"testing"
)

func TestKeyValueDiscardOldToDiscardNew(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

checkDiscard := func(expected DiscardPolicy) KeyValue {
t.Helper()
kv, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "TEST", History: 1})
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
si, err := js.StreamInfo("KV_TEST")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}
if si.Config.Discard != expected {
t.Fatalf("Expected discard policy %v, got %+v", expected, si)
}
return kv
}

// We are going to go from 2.7.1->2.7.2->2.7.1 and 2.7.2 again.
for i := 0; i < 2; i++ {
// Change the server version in the connection to
// create as-if we were connecting to a v2.7.1 server.
nc.mu.Lock()
nc.info.Version = "2.7.1"
nc.mu.Unlock()

kv := checkDiscard(DiscardOld)
if i == 0 {
if _, err := kv.PutString("foo", "value"); err != nil {
t.Fatalf("Error adding key: %v", err)
}
}

// Now change version to 2.7.2
nc.mu.Lock()
nc.info.Version = "2.7.2"
nc.mu.Unlock()

kv = checkDiscard(DiscardNew)
// Make sure the key still exists
if e, err := kv.Get("foo"); err != nil || string(e.Value()) != "value" {
t.Fatalf("Error getting key: err=%v e=%+v", err, e)
}
}
}
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -160,6 +160,7 @@ var (
ErrMsgNotFound = errors.New("nats: message not found")
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
)

func init() {
Expand Down
43 changes: 0 additions & 43 deletions test/kv_test.go
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"os"
"reflect"
"regexp"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -558,48 +557,6 @@ func TestKeyValueKeys(t *testing.T) {
}
}

func TestKeyValueDiscardNew(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 1, MaxBytes: 256})
expectOk(t, err)

vc := func() (major, minor, patch int) {
semVerRe := regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`)
m := semVerRe.FindStringSubmatch(nc.ConnectedServerVersion())
expectOk(t, err)
major, err = strconv.Atoi(m[1])
expectOk(t, err)
minor, err = strconv.Atoi(m[2])
expectOk(t, err)
patch, err = strconv.Atoi(m[3])
expectOk(t, err)
return major, minor, patch
}

major, minor, patch := vc()
status, err := kv.Status()
expectOk(t, err)
kvs := status.(*nats.KeyValueBucketStatus)
si := kvs.StreamInfo()

// If we are 2.7.1 or below DiscardOld should be used.
// If 2.7.2 or above should be DiscardNew
if major <= 2 && minor <= 7 && patch <= 1 {
if si.Config.Discard != nats.DiscardOld {
t.Fatalf("Expected Discard Old for server version %d.%d.%d", major, minor, patch)
}
} else {
if si.Config.Discard != nats.DiscardNew {
t.Fatalf("Expected Discard New for server version %d.%d.%d", major, minor, patch)
}
}
}

func TestKeyValueCrossAccounts(t *testing.T) {
conf := createConfFile(t, []byte(`
jetstream: enabled
Expand Down

0 comments on commit 77460b8

Please sign in to comment.