diff --git a/js.go b/js.go index cf62a9d05..08a8212d7 100644 --- a/js.go +++ b/js.go @@ -120,6 +120,9 @@ const ( // jsDomainT is used to create JetStream API prefix by specifying only Domain jsDomainT = "$JS.%s.API." + // jsExtDomainT is used to create a StreamSource External APIPrefix + jsExtDomainT = "$JS.%s.API" + // apiAccountInfo is for obtaining general information about JetStream. apiAccountInfo = "INFO" diff --git a/jsm.go b/jsm.go index d568f257b..cd4c79ab9 100644 --- a/jsm.go +++ b/jsm.go @@ -154,13 +154,44 @@ type StreamSource struct { OptStartTime *time.Time `json:"opt_start_time,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` External *ExternalStream `json:"external,omitempty"` + Domain string `json:"-"` } // ExternalStream allows you to qualify access to a stream source in another // account. type ExternalStream struct { APIPrefix string `json:"api"` - DeliverPrefix string `json:"deliver"` + DeliverPrefix string `json:"deliver,omitempty"` +} + +// Helper for copying when we do not want to change user's version. +func (ss *StreamSource) copy() *StreamSource { + nss := *ss + // Check pointers + if ss.OptStartTime != nil { + t := *ss.OptStartTime + nss.OptStartTime = &t + } + if ss.External != nil { + ext := *ss.External + nss.External = &ext + } + return &nss +} + +// If we have a Domain, convert to the appropriate ext.APIPrefix. +// This will change the stream source, so should be a copy passed in. +func (ss *StreamSource) convertDomain() error { + if ss.Domain == _EMPTY_ { + return nil + } + if ss.External != nil { + // These should be mutually exclusive. + // TODO(dlc) - Make generic? + return errors.New("nats: domain and external are both set") + } + ss.External = &ExternalStream{APIPrefix: fmt.Sprintf(jsExtDomainT, ss.Domain)} + return nil } // apiResponse is a standard response from the JetStream JSON API @@ -689,7 +720,31 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { defer cancel() } - req, err := json.Marshal(cfg) + // In case we need to change anything, copy so we do not change the caller's version. + ncfg := *cfg + + // If we have a mirror and an external domain, convert to ext.APIPrefix. + if cfg.Mirror != nil && cfg.Mirror.Domain != _EMPTY_ { + // Copy so we do not change the caller's version. + ncfg.Mirror = ncfg.Mirror.copy() + if err := ncfg.Mirror.convertDomain(); err != nil { + return nil, err + } + } + // Check sources for the same. + if len(ncfg.Sources) > 0 { + ncfg.Sources = append([]*StreamSource(nil), ncfg.Sources...) + for i, ss := range ncfg.Sources { + if ss.Domain != _EMPTY_ { + ncfg.Sources[i] = ss.copy() + if err := ncfg.Sources[i].convertDomain(); err != nil { + return nil, err + } + } + } + } + + req, err := json.Marshal(&ncfg) if err != nil { return nil, err } @@ -991,17 +1046,13 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt } var apiSubj string - - doDirectGetLastBySubject := o.directGet && mreq.LastFor != _EMPTY_ - - if doDirectGetLastBySubject { + if o.directGet && mreq.LastFor != _EMPTY_ { apiSubj = apiDirectMsgGetLastBySubjectT dsSubj := js.apiSubj(fmt.Sprintf(apiSubj, name, mreq.LastFor)) r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil) if err != nil { return nil, err } - return convertDirectGetMsgResponseToMsg(name, r) } diff --git a/kv.go b/kv.go index 4993df98a..b219cd77f 100644 --- a/kv.go +++ b/kv.go @@ -233,6 +233,8 @@ type KeyValueConfig struct { Replicas int Placement *Placement RePublish *RePublish + Mirror *StreamSource + Sources []*StreamSource } // Used to watch all keys. @@ -298,10 +300,12 @@ var ( ) const ( - kvBucketNameTmpl = "KV_%s" - kvSubjectsTmpl = "$KV.%s.>" - kvSubjectsPreTmpl = "$KV.%s." - kvNoPending = "0" + kvBucketNamePre = "KV_" + kvBucketNameTmpl = "KV_%s" + kvSubjectsTmpl = "$KV.%s.>" + kvSubjectsPreTmpl = "$KV.%s." + kvSubjectsPreDomainTmpl = "%s.$KV.%s." + kvNoPending = "0" ) // Regex for valid keys and buckets. @@ -386,7 +390,6 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { scfg := &StreamConfig{ Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket), Description: cfg.Description, - Subjects: []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}, MaxMsgsPerSubject: history, MaxBytes: maxBytes, MaxAge: cfg.TTL, @@ -402,6 +405,26 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { AllowDirect: true, RePublish: cfg.RePublish, } + if cfg.Mirror != nil { + // Copy in case we need to make changes so we do not change caller's version. + m := cfg.Mirror.copy() + if !strings.HasPrefix(m.Name, kvBucketNamePre) { + m.Name = fmt.Sprintf(kvBucketNameTmpl, m.Name) + } + scfg.Mirror = m + scfg.MirrorDirect = true + } else if len(cfg.Sources) > 0 { + // For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly. + for _, ss := range cfg.Sources { + if !strings.HasPrefix(ss.Name, kvBucketNamePre) { + ss = ss.copy() + ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name) + } + scfg.Sources = append(scfg.Sources, ss) + } + } else { + scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)} + } // If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below. if js.nc.serverMinVersion(2, 7, 2) { @@ -445,6 +468,7 @@ type kvs struct { name string stream string pre string + putPre string js *js // If true, it means that APIPrefix/Domain was set in the context // and we need to add something to some of our high level protocols @@ -520,9 +544,9 @@ func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) { var _opts [1]JSOpt opts := _opts[:0] if kv.useDirect { - _opts[0] = DirectGet() - opts = _opts[:1] + opts = append(opts, DirectGet()) } + if revision == kvLatestRevision { m, err = kv.js.GetLastMsg(kv.stream, b.String(), opts...) } else { @@ -573,7 +597,11 @@ func (kv *kvs) Put(key string, value []byte) (revision uint64, err error) { if kv.useJSPfx { b.WriteString(kv.js.opts.pre) } - b.WriteString(kv.pre) + if kv.putPre != _EMPTY_ { + b.WriteString(kv.putPre) + } else { + b.WriteString(kv.pre) + } b.WriteString(key) pa, err := kv.js.Publish(b.String(), value) @@ -1012,8 +1040,9 @@ func (js *js) KeyValueStores() <-chan KeyValueStatus { } func mapStreamToKVS(js *js, info *StreamInfo) *kvs { - bucket := strings.TrimPrefix(info.Config.Name, "KV_") - return &kvs{ + bucket := strings.TrimPrefix(info.Config.Name, kvBucketNamePre) + + kv := &kvs{ name: bucket, stream: info.Config.Name, pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket), @@ -1022,4 +1051,19 @@ func mapStreamToKVS(js *js, info *StreamInfo) *kvs { useJSPfx: js.opts.pre != defaultAPIPrefix, useDirect: info.Config.AllowDirect, } + + // If we are mirroring, we will have mirror direct on, so just use the mirror name + // and override use + if m := info.Config.Mirror; m != nil { + bucket := strings.TrimPrefix(m.Name, kvBucketNamePre) + if m.External != nil && m.External.APIPrefix != _EMPTY_ { + kv.useJSPfx = false + kv.pre = fmt.Sprintf(kvSubjectsPreTmpl, bucket) + kv.putPre = fmt.Sprintf(kvSubjectsPreDomainTmpl, m.External.APIPrefix, bucket) + } else { + kv.putPre = fmt.Sprintf(kvSubjectsPreTmpl, bucket) + } + } + + return kv } diff --git a/test/kv_test.go b/test/kv_test.go index b7b7e9a5a..f6431e0e0 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -863,3 +863,117 @@ func TestListKeyValueStores(t *testing.T) { }) } } + +func TestKeyValueMirrorCrossDomains(t *testing.T) { + conf := createConfFile(t, []byte(` + server_name: HUB + listen: 127.0.0.1:-1 + jetstream: { domain: HUB } + leafnodes { listen: 127.0.0.1:7422 } + }`)) + defer os.Remove(conf) + s, _ := RunServerWithConfig(conf) + defer shutdownJSServerAndRemoveStorage(t, s) + + lconf := createConfFile(t, []byte(` + server_name: LEAF + listen: 127.0.0.1:-1 + jetstream: { domain:LEAF } + leafnodes { + remotes = [ { url: "leaf://127.0.0.1" } ] + } + }`)) + defer os.Remove(lconf) + ln, _ := RunServerWithConfig(lconf) + defer shutdownJSServerAndRemoveStorage(t, ln) + + // Create main KV on HUB + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"}) + expectOk(t, err) + + _, err = kv.PutString("name", "derek") + expectOk(t, err) + _, err = kv.PutString("age", "22") + expectOk(t, err) + + lnc, ljs := jsClient(t, ln) + defer lnc.Close() + + // Capture cfg so we can make sure it does not change. + // NOTE: We use different name to test all possibilities, etc, but in practice for truly nomadic applications + // this should be named the same, e.g. TEST. + cfg := &nats.KeyValueConfig{ + Bucket: "MIRROR", + Mirror: &nats.StreamSource{ + Name: "TEST", + Domain: "HUB", + }, + } + ccfg := *cfg + + _, err = ljs.CreateKeyValue(cfg) + expectOk(t, err) + + if !reflect.DeepEqual(cfg, &ccfg) { + t.Fatalf("Did not expect config to be altered: %+v vs %+v", cfg, ccfg) + } + + si, err := ljs.StreamInfo("KV_MIRROR") + expectOk(t, err) + + // Make sure mirror direct set. + if !si.Config.MirrorDirect { + t.Fatalf("Expected mirror direct to be set") + } + + // Make sure we sync. + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + si, err := ljs.StreamInfo("KV_MIRROR") + expectOk(t, err) + if si.State.Msgs == 2 { + return nil + } + return fmt.Errorf("Did not get synched messages: %d", si.State.Msgs) + }) + + // Bind locally from leafnode and make sure both get and put work. + mkv, err := ljs.KeyValue("MIRROR") + expectOk(t, err) + + _, err = mkv.PutString("name", "rip") + expectOk(t, err) + + e, err := mkv.Get("name") + expectOk(t, err) + if string(e.Value()) != "rip" { + t.Fatalf("Got wrong value: %q vs %q", e.Value(), "rip") + } + + // Bind through leafnode connection but to origin KV. + rjs, err := lnc.JetStream(nats.Domain("HUB")) + expectOk(t, err) + + rkv, err := rjs.KeyValue("TEST") + expectOk(t, err) + + _, err = rkv.PutString("name", "ivan") + expectOk(t, err) + + e, err = rkv.Get("name") + expectOk(t, err) + if string(e.Value()) != "ivan" { + t.Fatalf("Got wrong value: %q vs %q", e.Value(), "ivan") + } + + // Shutdown cluster and test get still work. + shutdownJSServerAndRemoveStorage(t, s) + + e, err = mkv.Get("name") + expectOk(t, err) + if string(e.Value()) != "ivan" { + t.Fatalf("Got wrong value: %q vs %q", e.Value(), "ivan") + } +}