Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for KV mirrors and sources. #1112

Merged
merged 1 commit into from Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions js.go
Expand Up @@ -120,6 +120,9 @@ const (
// jsDomainT is used to create JetStream API prefix by specifying only Domain
jsDomainT = "$JS.%s.API."

// jsExtDomainT is used to create a StreamSource External APIPrefix
jsExtDomainT = "$JS.%s.API"

// apiAccountInfo is for obtaining general information about JetStream.
apiAccountInfo = "INFO"

Expand Down
65 changes: 58 additions & 7 deletions jsm.go
Expand Up @@ -154,13 +154,44 @@ type StreamSource struct {
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
}

// ExternalStream allows you to qualify access to a stream source in another
// account.
type ExternalStream struct {
APIPrefix string `json:"api"`
DeliverPrefix string `json:"deliver"`
DeliverPrefix string `json:"deliver,omitempty"`
}

// Helper for copying when we do not want to change user's version.
func (ss *StreamSource) copy() *StreamSource {
nss := *ss
// Check pointers
if ss.OptStartTime != nil {
t := *ss.OptStartTime
nss.OptStartTime = &t
}
if ss.External != nil {
ext := *ss.External
nss.External = &ext
}
return &nss
}

// If we have a Domain, convert to the appropriate ext.APIPrefix.
// This will change the stream source, so should be a copy passed in.
func (ss *StreamSource) convertDomain() error {
if ss.Domain == _EMPTY_ {
return nil
}
if ss.External != nil {
// These should be mutually exclusive.
// TODO(dlc) - Make generic?
return errors.New("nats: domain and external are both set")
}
ss.External = &ExternalStream{APIPrefix: fmt.Sprintf(jsExtDomainT, ss.Domain)}
return nil
}

// apiResponse is a standard response from the JetStream JSON API
Expand Down Expand Up @@ -689,7 +720,31 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
defer cancel()
}

req, err := json.Marshal(cfg)
// In case we need to change anything, copy so we do not change the caller's version.
ncfg := *cfg

// If we have a mirror and an external domain, convert to ext.APIPrefix.
if cfg.Mirror != nil && cfg.Mirror.Domain != _EMPTY_ {
// Copy so we do not change the caller's version.
ncfg.Mirror = ncfg.Mirror.copy()
if err := ncfg.Mirror.convertDomain(); err != nil {
return nil, err
}
}
// Check sources for the same.
if len(ncfg.Sources) > 0 {
ncfg.Sources = append([]*StreamSource(nil), ncfg.Sources...)
for i, ss := range ncfg.Sources {
if ss.Domain != _EMPTY_ {
ncfg.Sources[i] = ss.copy()
if err := ncfg.Sources[i].convertDomain(); err != nil {
return nil, err
}
}
}
}

req, err := json.Marshal(&ncfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -991,17 +1046,13 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
}

var apiSubj string

doDirectGetLastBySubject := o.directGet && mreq.LastFor != _EMPTY_

if doDirectGetLastBySubject {
if o.directGet && mreq.LastFor != _EMPTY_ {
apiSubj = apiDirectMsgGetLastBySubjectT
dsSubj := js.apiSubj(fmt.Sprintf(apiSubj, name, mreq.LastFor))
r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil)
if err != nil {
return nil, err
}

return convertDirectGetMsgResponseToMsg(name, r)
}

Expand Down
64 changes: 54 additions & 10 deletions kv.go
Expand Up @@ -233,6 +233,8 @@ type KeyValueConfig struct {
Replicas int
Placement *Placement
RePublish *RePublish
Mirror *StreamSource
Sources []*StreamSource
}

// Used to watch all keys.
Expand Down Expand Up @@ -298,10 +300,12 @@ var (
)

const (
kvBucketNameTmpl = "KV_%s"
kvSubjectsTmpl = "$KV.%s.>"
kvSubjectsPreTmpl = "$KV.%s."
kvNoPending = "0"
kvBucketNamePre = "KV_"
kvBucketNameTmpl = "KV_%s"
kvSubjectsTmpl = "$KV.%s.>"
kvSubjectsPreTmpl = "$KV.%s."
kvSubjectsPreDomainTmpl = "%s.$KV.%s."
kvNoPending = "0"
)

// Regex for valid keys and buckets.
Expand Down Expand Up @@ -386,7 +390,6 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
scfg := &StreamConfig{
Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
Description: cfg.Description,
Subjects: []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)},
MaxMsgsPerSubject: history,
MaxBytes: maxBytes,
MaxAge: cfg.TTL,
Expand All @@ -402,6 +405,26 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
AllowDirect: true,
RePublish: cfg.RePublish,
}
if cfg.Mirror != nil {
// Copy in case we need to make changes so we do not change caller's version.
m := cfg.Mirror.copy()
if !strings.HasPrefix(m.Name, kvBucketNamePre) {
m.Name = fmt.Sprintf(kvBucketNameTmpl, m.Name)
}
scfg.Mirror = m
scfg.MirrorDirect = true
} else if len(cfg.Sources) > 0 {
// For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly.
for _, ss := range cfg.Sources {
if !strings.HasPrefix(ss.Name, kvBucketNamePre) {
ss = ss.copy()
ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name)
}
scfg.Sources = append(scfg.Sources, ss)
}
} else {
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
}

// If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below.
if js.nc.serverMinVersion(2, 7, 2) {
Expand Down Expand Up @@ -445,6 +468,7 @@ type kvs struct {
name string
stream string
pre string
putPre string
js *js
// If true, it means that APIPrefix/Domain was set in the context
// and we need to add something to some of our high level protocols
Expand Down Expand Up @@ -520,9 +544,9 @@ func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) {
var _opts [1]JSOpt
opts := _opts[:0]
if kv.useDirect {
_opts[0] = DirectGet()
opts = _opts[:1]
opts = append(opts, DirectGet())
}

if revision == kvLatestRevision {
m, err = kv.js.GetLastMsg(kv.stream, b.String(), opts...)
} else {
Expand Down Expand Up @@ -573,7 +597,11 @@ func (kv *kvs) Put(key string, value []byte) (revision uint64, err error) {
if kv.useJSPfx {
b.WriteString(kv.js.opts.pre)
}
b.WriteString(kv.pre)
if kv.putPre != _EMPTY_ {
b.WriteString(kv.putPre)
} else {
b.WriteString(kv.pre)
}
b.WriteString(key)

pa, err := kv.js.Publish(b.String(), value)
Expand Down Expand Up @@ -1012,8 +1040,9 @@ func (js *js) KeyValueStores() <-chan KeyValueStatus {
}

func mapStreamToKVS(js *js, info *StreamInfo) *kvs {
bucket := strings.TrimPrefix(info.Config.Name, "KV_")
return &kvs{
bucket := strings.TrimPrefix(info.Config.Name, kvBucketNamePre)

kv := &kvs{
name: bucket,
stream: info.Config.Name,
pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket),
Expand All @@ -1022,4 +1051,19 @@ func mapStreamToKVS(js *js, info *StreamInfo) *kvs {
useJSPfx: js.opts.pre != defaultAPIPrefix,
useDirect: info.Config.AllowDirect,
}

// If we are mirroring, we will have mirror direct on, so just use the mirror name
// and override use
if m := info.Config.Mirror; m != nil {
bucket := strings.TrimPrefix(m.Name, kvBucketNamePre)
if m.External != nil && m.External.APIPrefix != _EMPTY_ {
kv.useJSPfx = false
kv.pre = fmt.Sprintf(kvSubjectsPreTmpl, bucket)
kv.putPre = fmt.Sprintf(kvSubjectsPreDomainTmpl, m.External.APIPrefix, bucket)
} else {
kv.putPre = fmt.Sprintf(kvSubjectsPreTmpl, bucket)
}
}

return kv
}
114 changes: 114 additions & 0 deletions test/kv_test.go
Expand Up @@ -863,3 +863,117 @@ func TestListKeyValueStores(t *testing.T) {
})
}
}

func TestKeyValueMirrorCrossDomains(t *testing.T) {
conf := createConfFile(t, []byte(`
server_name: HUB
listen: 127.0.0.1:-1
jetstream: { domain: HUB }
leafnodes { listen: 127.0.0.1:7422 }
}`))
defer os.Remove(conf)
s, _ := RunServerWithConfig(conf)
defer shutdownJSServerAndRemoveStorage(t, s)

lconf := createConfFile(t, []byte(`
server_name: LEAF
listen: 127.0.0.1:-1
jetstream: { domain:LEAF }
leafnodes {
remotes = [ { url: "leaf://127.0.0.1" } ]
}
}`))
defer os.Remove(lconf)
ln, _ := RunServerWithConfig(lconf)
defer shutdownJSServerAndRemoveStorage(t, ln)

// Create main KV on HUB
nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
expectOk(t, err)

_, err = kv.PutString("name", "derek")
expectOk(t, err)
_, err = kv.PutString("age", "22")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you wish ;-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch! :)

expectOk(t, err)

lnc, ljs := jsClient(t, ln)
defer lnc.Close()

// Capture cfg so we can make sure it does not change.
// NOTE: We use different name to test all possibilities, etc, but in practice for truly nomadic applications
// this should be named the same, e.g. TEST.
cfg := &nats.KeyValueConfig{
Bucket: "MIRROR",
Mirror: &nats.StreamSource{
Name: "TEST",
Domain: "HUB",
},
}
ccfg := *cfg

_, err = ljs.CreateKeyValue(cfg)
expectOk(t, err)

if !reflect.DeepEqual(cfg, &ccfg) {
t.Fatalf("Did not expect config to be altered: %+v vs %+v", cfg, ccfg)
}

si, err := ljs.StreamInfo("KV_MIRROR")
expectOk(t, err)

// Make sure mirror direct set.
if !si.Config.MirrorDirect {
t.Fatalf("Expected mirror direct to be set")
}

// Make sure we sync.
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
si, err := ljs.StreamInfo("KV_MIRROR")
expectOk(t, err)
if si.State.Msgs == 2 {
return nil
}
return fmt.Errorf("Did not get synched messages: %d", si.State.Msgs)
})

// Bind locally from leafnode and make sure both get and put work.
mkv, err := ljs.KeyValue("MIRROR")
expectOk(t, err)

_, err = mkv.PutString("name", "rip")
expectOk(t, err)

e, err := mkv.Get("name")
expectOk(t, err)
if string(e.Value()) != "rip" {
t.Fatalf("Got wrong value: %q vs %q", e.Value(), "rip")
}

// Bind through leafnode connection but to origin KV.
rjs, err := lnc.JetStream(nats.Domain("HUB"))
expectOk(t, err)

rkv, err := rjs.KeyValue("TEST")
expectOk(t, err)

_, err = rkv.PutString("name", "ivan")
expectOk(t, err)

e, err = rkv.Get("name")
expectOk(t, err)
if string(e.Value()) != "ivan" {
t.Fatalf("Got wrong value: %q vs %q", e.Value(), "ivan")
}

// Shutdown cluster and test get still work.
shutdownJSServerAndRemoveStorage(t, s)

e, err = mkv.Get("name")
expectOk(t, err)
if string(e.Value()) != "ivan" {
t.Fatalf("Got wrong value: %q vs %q", e.Value(), "ivan")
}
}