From b90a54392fb84eb6d60a655f708ff33be0413492 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 19 Sep 2022 16:11:17 +0200 Subject: [PATCH 1/2] Add listing KVs and KV names --- kv.go | 97 +++++++++++++++++++++++++++++++++++---------- test/kv_test.go | 50 +++++++++++++++++++++++ test/object_test.go | 2 +- 3 files changed, 127 insertions(+), 22 deletions(-) diff --git a/kv.go b/kv.go index 2c74848a1..a6c27eda5 100644 --- a/kv.go +++ b/kv.go @@ -35,6 +35,10 @@ type KeyValueManager interface { CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) // DeleteKeyValue will delete this KeyValue store (JetStream stream). DeleteKeyValue(bucket string) error + // KeyValueStoreNames is used to retrieve a list of key value store names + KeyValueStoreNames() <-chan string + // KeyValueStores is used to retrieve a list of key value stores + KeyValueStores() <-chan KeyValue } // Notice: Experimental Preview @@ -325,16 +329,7 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) { return nil, ErrBadBucket } - kv := &kvs{ - name: bucket, - stream: stream, - pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket), - js: js, - // Determine if we need to use the JS prefix in front of Put and Delete operations - useJSPfx: js.opts.pre != defaultAPIPrefix, - useDirect: si.Config.AllowDirect, - } - return kv, nil + return mapStreamToKVS(js, si), nil } // CreateKeyValue will create a KeyValue store with the following configuration. @@ -431,17 +426,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { return nil, err } } - - kv := &kvs{ - name: cfg.Bucket, - stream: scfg.Name, - pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket), - js: js, - // Determine if we need to use the JS prefix in front of Put and Delete operations - useJSPfx: js.opts.pre != defaultAPIPrefix, - useDirect: si.Config.AllowDirect, - } - return kv, nil + return mapStreamToKVS(js, si), nil } // DeleteKeyValue will delete this KeyValue store (JetStream stream). @@ -980,3 +965,73 @@ func (kv *kvs) Status() (KeyValueStatus, error) { return &KeyValueBucketStatus{nfo: nfo, bucket: kv.name}, nil } + +// KeyValueStoreNames is used to retrieve a list of key value store names +func (js *js) KeyValueStoreNames() <-chan string { + ch := make(chan string) + ctx, cancel := context.WithTimeout(context.Background(), defaultRequestWait) + l := &streamLister{js: js} + l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*") + l.js.opts.ctx = ctx + go func() { + if cancel != nil { + defer cancel() + } + defer close(ch) + for l.Next() { + for _, info := range l.Page() { + if !strings.HasPrefix(info.Config.Name, "KV_") { + continue + } + select { + case ch <- info.Config.Name: + case <-ctx.Done(): + return + } + } + } + }() + + return ch +} + +// KeyValueStores is used to retrieve a list of key value stores +func (js *js) KeyValueStores() <-chan KeyValue { + ch := make(chan KeyValue) + ctx, cancel := context.WithTimeout(context.Background(), defaultRequestWait) + l := &streamLister{js: js} + l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*") + l.js.opts.ctx = ctx + go func() { + if cancel != nil { + defer cancel() + } + defer close(ch) + for l.Next() { + for _, info := range l.Page() { + if !strings.HasPrefix(info.Config.Name, "KV_") { + continue + } + select { + case ch <- mapStreamToKVS(js, info): + case <-ctx.Done(): + return + } + } + } + }() + return ch +} + +func mapStreamToKVS(js *js, info *StreamInfo) *kvs { + bucket := strings.TrimPrefix(info.Config.Name, "KV_") + return &kvs{ + name: bucket, + stream: info.Config.Name, + pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket), + js: js, + // Determine if we need to use the JS prefix in front of Put and Delete operations + useJSPfx: js.opts.pre != defaultAPIPrefix, + useDirect: info.Config.AllowDirect, + } +} diff --git a/test/kv_test.go b/test/kv_test.go index 4ece98562..8531584f6 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -813,3 +813,53 @@ func expectErr(t *testing.T, err error, expected ...error) { } t.Fatalf("Expected one of %+v, got '%v'", expected, err) } + +func TestListKeyValueStores(t *testing.T) { + tests := []struct { + name string + bucketsNum int + }{ + { + name: "single page", + bucketsNum: 5, + }, + { + name: "multi page", + bucketsNum: 1025, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + // create stream without the chunk subject, but with KV_ prefix + _, err := js.AddStream(&nats.StreamConfig{Name: "KV_FOO", Subjects: []string{"FOO.*"}}) + expectOk(t, err) + // create stream with chunk subject, but without "KV_" prefix + _, err = js.AddStream(&nats.StreamConfig{Name: "FOO", Subjects: []string{"$KV.ABC.>"}}) + expectOk(t, err) + for i := 0; i < test.bucketsNum; i++ { + _, err = js.CreateKeyValue(&nats.KeyValueConfig{Bucket: fmt.Sprintf("KVS_%d", i), MaxBytes: 1024}) + expectOk(t, err) + } + names := make([]string, 0) + for name := range js.KeyValueStoreNames() { + names = append(names, name) + } + if len(names) != test.bucketsNum { + t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.bucketsNum, len(names)) + } + infos := make([]nats.KeyValue, 0) + for info := range js.KeyValueStores() { + infos = append(infos, info) + } + if len(infos) != test.bucketsNum { + t.Fatalf("Invalid number of streams; want: %d; got: %d", test.bucketsNum, len(infos)) + } + }) + } +} diff --git a/test/object_test.go b/test/object_test.go index bbede2018..f1447b0c7 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -791,7 +791,7 @@ func TestObjectMaxBytes(t *testing.T) { } } -func TestBucketNames(t *testing.T) { +func TestListObjectStores(t *testing.T) { tests := []struct { name string bucketsNum int From fa8e70808017d736e3fe36692cd9c4e35184fb79 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 19 Sep 2022 17:27:02 +0200 Subject: [PATCH 2/2] Get rid of custom context --- kv.go | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/kv.go b/kv.go index a6c27eda5..a25485cea 100644 --- a/kv.go +++ b/kv.go @@ -969,25 +969,16 @@ func (kv *kvs) Status() (KeyValueStatus, error) { // KeyValueStoreNames is used to retrieve a list of key value store names func (js *js) KeyValueStoreNames() <-chan string { ch := make(chan string) - ctx, cancel := context.WithTimeout(context.Background(), defaultRequestWait) l := &streamLister{js: js} l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*") - l.js.opts.ctx = ctx go func() { - if cancel != nil { - defer cancel() - } defer close(ch) for l.Next() { for _, info := range l.Page() { if !strings.HasPrefix(info.Config.Name, "KV_") { continue } - select { - case ch <- info.Config.Name: - case <-ctx.Done(): - return - } + ch <- info.Config.Name } } }() @@ -998,25 +989,16 @@ func (js *js) KeyValueStoreNames() <-chan string { // KeyValueStores is used to retrieve a list of key value stores func (js *js) KeyValueStores() <-chan KeyValue { ch := make(chan KeyValue) - ctx, cancel := context.WithTimeout(context.Background(), defaultRequestWait) l := &streamLister{js: js} l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*") - l.js.opts.ctx = ctx go func() { - if cancel != nil { - defer cancel() - } defer close(ch) for l.Next() { for _, info := range l.Page() { if !strings.HasPrefix(info.Config.Name, "KV_") { continue } - select { - case ch <- mapStreamToKVS(js, info): - case <-ctx.Done(): - return - } + ch <- mapStreamToKVS(js, info) } } }()