Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] KV use of JS prefix #910

Merged
merged 2 commits into from Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions kv.go
Expand Up @@ -291,6 +291,8 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) {
stream: stream,
pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket),
js: js,
// Determine if we need to use the JS prefix in front of Put and Delete operations
useJSPfx: js.opts.pre != defaultAPIPrefix,
}
return kv, nil
}
Expand Down Expand Up @@ -352,6 +354,8 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
stream: scfg.Name,
pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket),
js: js,
// Determine if we need to use the JS prefix in front of Put and Delete operations
useJSPfx: js.opts.pre != defaultAPIPrefix,
}
return kv, nil
}
Expand All @@ -370,6 +374,10 @@ type kvs struct {
stream string
pre string
js *js
// If true, it means that APIPrefix/Domain was set in the context
// and we need to add something to some of our high level protocols
// (such as Put, etc..)
useJSPfx bool
}

// Underlying entry.
Expand Down Expand Up @@ -481,6 +489,9 @@ func (kv *kvs) Put(key string, value []byte) (revision uint64, err error) {
}

var b strings.Builder
if kv.useJSPfx {
b.WriteString(kv.js.opts.pre)
}
b.WriteString(kv.pre)
b.WriteString(key)

Expand Down Expand Up @@ -548,6 +559,9 @@ func (kv *kvs) delete(key string, purge bool) error {
}

var b strings.Builder
if kv.useJSPfx {
b.WriteString(kv.js.opts.pre)
}
b.WriteString(kv.pre)
b.WriteString(key)

Expand Down
145 changes: 145 additions & 0 deletions test/kv_test.go
Expand Up @@ -16,6 +16,7 @@ package test
import (
"context"
"fmt"
"os"
"reflect"
"regexp"
"strconv"
Expand Down Expand Up @@ -599,6 +600,150 @@ func TestKeyValueDiscardNew(t *testing.T) {
}
}

func TestKeyValueCrossAccounts(t *testing.T) {
conf := createConfFile(t, []byte(`
jetstream: enabled
accounts: {
A: {
users: [ {user: a, password: a} ]
jetstream: enabled
exports: [
{service: '$JS.API.>' }
{service: '$KV.>'}
{stream: 'accI.>'}
]
},
I: {
users: [ {user: i, password: i} ]
imports: [
{service: {account: A, subject: '$JS.API.>'}, to: 'fromA.>' }
{service: {account: A, subject: '$KV.>'}, to: 'fromA.$KV.>' }
{stream: {subject: 'accI.>', account: A}}
]
}
}`))
defer os.Remove(conf)
s, _ := RunServerWithConfig(conf)
defer shutdownJSServerAndRemoveStorage(t, s)

watchNext := func(w nats.KeyWatcher) nats.KeyValueEntry {
t.Helper()
select {
case e := <-w.Updates():
return e
case <-time.After(time.Second):
t.Fatal("Fail to get the next update")
}
return nil
}

nc1, js1 := jsClient(t, s, nats.UserInfo("a", "a"))
defer nc1.Close()

kv1, err := js1.CreateKeyValue(&nats.KeyValueConfig{Bucket: "Map"})
if err != nil {
t.Fatalf("Error creating kv store: %v", err)
}

w1, err := kv1.Watch("map")
if err != nil {
t.Fatalf("Error creating watcher: %v", err)
}
if e := watchNext(w1); e != nil {
t.Fatalf("Expected nil entry, got %+v", e)
}

nc2, err := nats.Connect(s.ClientURL(), nats.UserInfo("i", "i"), nats.CustomInboxPrefix("accI"))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
js2, err := nc2.JetStream(nats.APIPrefix("fromA"))
if err != nil {
t.Fatalf("Error getting jetstream context: %v", err)
}

kv2, err := js2.CreateKeyValue(&nats.KeyValueConfig{Bucket: "Map"})
if err != nil {
t.Fatalf("Error creating kv store: %v", err)
}

w2, err := kv2.Watch("map")
if err != nil {
t.Fatalf("Error creating watcher: %v", err)
}
if e := watchNext(w2); e != nil {
t.Fatalf("Expected nil entry, got %+v", e)
}

if _, err := kv2.Put("map", []byte("value")); err != nil {
t.Fatalf("Error on put: %v", err)
}

// Get from kv1
e, err := kv1.Get("map")
if err != nil {
t.Fatalf("Error on get: %v", err)
}
if e.Key() != "map" || string(e.Value()) != "value" {
t.Fatalf("Unexpected entry: +%v", e)
}

// Get from kv2
e, err = kv2.Get("map")
if err != nil {
t.Fatalf("Error on get: %v", err)
}
if e.Key() != "map" || string(e.Value()) != "value" {
t.Fatalf("Unexpected entry: +%v", e)
}

// Watcher 1
if e := watchNext(w1); e == nil || e.Key() != "map" || string(e.Value()) != "value" {
t.Fatalf("Unexpected entry: %+v", e)
}

// Watcher 2
if e := watchNext(w2); e == nil || e.Key() != "map" || string(e.Value()) != "value" {
t.Fatalf("Unexpected entry: %+v", e)
}

// Purge from kv2
if err := kv2.Purge("map"); err != nil {
t.Fatalf("Error on purge: %v", err)
}

// Check purge ok from w1
if e := watchNext(w1); e == nil || e.Operation() != nats.KeyValuePurge {
t.Fatalf("Unexpected entry: %+v", e)
}

// Check purge ok from w2
if e := watchNext(w2); e == nil || e.Operation() != nats.KeyValuePurge {
t.Fatalf("Unexpected entry: %+v", e)
}

// Delete purge records from kv2
if err := kv2.PurgeDeletes(nats.DeleteMarkersOlderThan(-1)); err != nil {
t.Fatalf("Error on purge deletes: %v", err)
}

// Check all gone from js1
if si, err := js1.StreamInfo("KV_Map"); err != nil || si == nil || si.State.Msgs != 0 {
t.Fatalf("Error getting stream info: err=%v si=%+v", err, si)
}

// Delete key from kv2
if err := kv2.Delete("map"); err != nil {
t.Fatalf("Error on delete: %v", err)
}

// Check key gone from kv1
if e, err := kv1.Get("map"); err != nats.ErrKeyNotFound || e != nil {
t.Fatalf("Expected key not found, got err=%v e=%+v", err, e)
}
}

// Helpers

func client(t *testing.T, s *server.Server, opts ...nats.Option) *nats.Conn {
Expand Down