diff --git a/jsm.go b/jsm.go index 87ab37cec..129fb7642 100644 --- a/jsm.go +++ b/jsm.go @@ -587,6 +587,9 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { return nil, err } if resp.Error != nil { + if resp.Error.ErrorCode == 10058 { + return nil, ErrStreamNameAlreadyInUse + } return nil, errors.New(resp.Error.Description) } diff --git a/kv.go b/kv.go index 0b75054d9..da33774e8 100644 --- a/kv.go +++ b/kv.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "reflect" "regexp" "strconv" "strings" @@ -326,18 +327,31 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { replicas = 1 } + // We will set explicitly some values so that we can do comparison + // if we get an "already in use" error and need to check if it is same. + maxBytes := cfg.MaxBytes + if maxBytes == 0 { + maxBytes = -1 + } + maxMsgSize := cfg.MaxValueSize + if maxMsgSize == 0 { + maxMsgSize = -1 + } scfg := &StreamConfig{ Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket), Description: cfg.Description, Subjects: []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}, MaxMsgsPerSubject: history, - MaxBytes: cfg.MaxBytes, + MaxBytes: maxBytes, MaxAge: cfg.TTL, - MaxMsgSize: cfg.MaxValueSize, + MaxMsgSize: maxMsgSize, Storage: cfg.Storage, Replicas: replicas, AllowRollup: true, DenyDelete: true, + Duplicates: 2 * time.Minute, + MaxMsgs: -1, + MaxConsumers: -1, } // If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below. @@ -346,7 +360,24 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { } if _, err := js.AddStream(scfg); err != nil { - return nil, err + // If we have a failure to add, it could be because we have + // a config change if the KV was created against a pre 2.7.2 + // and we are now moving to a v2.7.2+. If that is the case + // and the only difference is the discard policy, then update + // the stream. + if err == ErrStreamNameAlreadyInUse { + if si, _ := js.StreamInfo(scfg.Name); si != nil { + // To compare, make the server's stream info discard + // policy same than ours. + si.Config.Discard = scfg.Discard + if reflect.DeepEqual(&si.Config, scfg) { + _, err = js.UpdateStream(scfg) + } + } + } + if err != nil { + return nil, err + } } kv := &kvs{ diff --git a/kv_test.go b/kv_test.go new file mode 100644 index 000000000..a7cd03e20 --- /dev/null +++ b/kv_test.go @@ -0,0 +1,69 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nats + +import ( + "testing" +) + +func TestKeyValueDiscardOldToDiscardNew(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + checkDiscard := func(expected DiscardPolicy) KeyValue { + t.Helper() + kv, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "TEST", History: 1}) + if err != nil { + t.Fatalf("Error creating store: %v", err) + } + si, err := js.StreamInfo("KV_TEST") + if err != nil { + t.Fatalf("Error getting stream info: %v", err) + } + if si.Config.Discard != expected { + t.Fatalf("Expected discard policy %v, got %+v", expected, si) + } + return kv + } + + // We are going to go from 2.7.1->2.7.2->2.7.1 and 2.7.2 again. + for i := 0; i < 2; i++ { + // Change the server version in the connection to + // create as-if we were connecting to a v2.7.1 server. + nc.mu.Lock() + nc.info.Version = "2.7.1" + nc.mu.Unlock() + + kv := checkDiscard(DiscardOld) + if i == 0 { + if _, err := kv.PutString("foo", "value"); err != nil { + t.Fatalf("Error adding key: %v", err) + } + } + + // Now change version to 2.7.2 + nc.mu.Lock() + nc.info.Version = "2.7.2" + nc.mu.Unlock() + + kv = checkDiscard(DiscardNew) + // Make sure the key still exists + if e, err := kv.Get("foo"); err != nil || string(e.Value()) != "value" { + t.Fatalf("Error getting key: err=%v e=%+v", err, e) + } + } +} diff --git a/nats.go b/nats.go index 3bf5b4d57..bc2fe5d56 100644 --- a/nats.go +++ b/nats.go @@ -160,6 +160,7 @@ var ( ErrMsgNotFound = errors.New("nats: message not found") ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged") ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed") + ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use") ) func init() { diff --git a/test/kv_test.go b/test/kv_test.go index 831ad1315..7568f43bb 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -18,7 +18,6 @@ import ( "fmt" "os" "reflect" - "regexp" "strconv" "strings" "testing" @@ -558,48 +557,6 @@ func TestKeyValueKeys(t *testing.T) { } } -func TestKeyValueDiscardNew(t *testing.T) { - s := RunBasicJetStreamServer() - defer shutdownJSServerAndRemoveStorage(t, s) - - nc, js := jsClient(t, s) - defer nc.Close() - - kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 1, MaxBytes: 256}) - expectOk(t, err) - - vc := func() (major, minor, patch int) { - semVerRe := regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`) - m := semVerRe.FindStringSubmatch(nc.ConnectedServerVersion()) - expectOk(t, err) - major, err = strconv.Atoi(m[1]) - expectOk(t, err) - minor, err = strconv.Atoi(m[2]) - expectOk(t, err) - patch, err = strconv.Atoi(m[3]) - expectOk(t, err) - return major, minor, patch - } - - major, minor, patch := vc() - status, err := kv.Status() - expectOk(t, err) - kvs := status.(*nats.KeyValueBucketStatus) - si := kvs.StreamInfo() - - // If we are 2.7.1 or below DiscardOld should be used. - // If 2.7.2 or above should be DiscardNew - if major <= 2 && minor <= 7 && patch <= 1 { - if si.Config.Discard != nats.DiscardOld { - t.Fatalf("Expected Discard Old for server version %d.%d.%d", major, minor, patch) - } - } else { - if si.Config.Discard != nats.DiscardNew { - t.Fatalf("Expected Discard New for server version %d.%d.%d", major, minor, patch) - } - } -} - func TestKeyValueCrossAccounts(t *testing.T) { conf := createConfFile(t, []byte(` jetstream: enabled