Skip to content

Commit

Permalink
Updates based on code review
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 19, 2022
1 parent 79d6644 commit 5bdaa1f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 23 deletions.
3 changes: 3 additions & 0 deletions jsm.go
Expand Up @@ -587,6 +587,9 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == 10058 {
return nil, ErrStreamNameAlreadyInUse
}
return nil, errors.New(resp.Error.Description)
}

Expand Down
43 changes: 20 additions & 23 deletions kv.go
Expand Up @@ -327,18 +327,31 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
replicas = 1
}

// We will set explicitly some values so that we can do comparison
// if we get an "already in use" error and need to check if it is same.
maxBytes := cfg.MaxBytes
if maxBytes == 0 {
maxBytes = -1
}
maxMsgSize := cfg.MaxValueSize
if maxMsgSize == 0 {
maxMsgSize = -1
}
scfg := &StreamConfig{
Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
Description: cfg.Description,
Subjects: []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)},
MaxMsgsPerSubject: history,
MaxBytes: cfg.MaxBytes,
MaxBytes: maxBytes,
MaxAge: cfg.TTL,
MaxMsgSize: cfg.MaxValueSize,
MaxMsgSize: maxMsgSize,
Storage: cfg.Storage,
Replicas: replicas,
AllowRollup: true,
DenyDelete: true,
Duplicates: 2 * time.Minute,
MaxMsgs: -1,
MaxConsumers: -1,
}

// If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below.
Expand All @@ -352,9 +365,12 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
// and we are now moving to a v2.7.2+. If that is the case
// and the only difference is the discard policy, then update
// the stream.
if strings.Contains(err.Error(), "already in use") {
if err == ErrStreamNameAlreadyInUse {
if si, _ := js.StreamInfo(scfg.Name); si != nil {
if streamCfgSameSansDiscard(&si.Config, scfg) {
// To compare, make the server's stream info discard
// policy same than ours.
si.Config.Discard = scfg.Discard
if reflect.DeepEqual(&si.Config, scfg) {
_, err = js.UpdateStream(scfg)
}
}
Expand All @@ -375,25 +391,6 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
return kv, nil
}

func streamCfgSameSansDiscard(srvCfg, ourCfg *StreamConfig) bool {
// The server sets some values (like -1) when sending 0.
// So we need to align to what the server may return to
// have a meaningfull comparison.
ours := *ourCfg
if ours.MaxMsgSize == 0 {
ours.MaxMsgSize = -1
}
if ours.MaxBytes == 0 {
ours.MaxBytes = -1
}
ours.Duplicates = 2 * time.Minute
ours.MaxMsgs = -1
ours.MaxConsumers = -1
// Set our discard to what server has for the comparison
ours.Discard = srvCfg.Discard
return reflect.DeepEqual(srvCfg, &ours)
}

// DeleteKeyValue will delete this KeyValue store (JetStream stream).
func (js *js) DeleteKeyValue(bucket string) error {
if !validBucketRe.MatchString(bucket) {
Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -160,6 +160,7 @@ var (
ErrMsgNotFound = errors.New("nats: message not found")
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
)

func init() {
Expand Down

0 comments on commit 5bdaa1f

Please sign in to comment.