Skip to content

Commit

Permalink
[ADDED] Listing KeyValue store and KeyValue store names (#1088)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Sep 20, 2022
1 parent 715a591 commit 8e72eaa
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 22 deletions.
79 changes: 58 additions & 21 deletions kv.go
Expand Up @@ -35,6 +35,10 @@ type KeyValueManager interface {
CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
// DeleteKeyValue will delete this KeyValue store (JetStream stream).
DeleteKeyValue(bucket string) error
// KeyValueStoreNames is used to retrieve a list of key value store names
KeyValueStoreNames() <-chan string
// KeyValueStores is used to retrieve a list of key value stores
KeyValueStores() <-chan KeyValue
}

// Notice: Experimental Preview
Expand Down Expand Up @@ -325,16 +329,7 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) {
return nil, ErrBadBucket
}

kv := &kvs{
name: bucket,
stream: stream,
pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket),
js: js,
// Determine if we need to use the JS prefix in front of Put and Delete operations
useJSPfx: js.opts.pre != defaultAPIPrefix,
useDirect: si.Config.AllowDirect,
}
return kv, nil
return mapStreamToKVS(js, si), nil
}

// CreateKeyValue will create a KeyValue store with the following configuration.
Expand Down Expand Up @@ -431,17 +426,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
return nil, err
}
}

kv := &kvs{
name: cfg.Bucket,
stream: scfg.Name,
pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket),
js: js,
// Determine if we need to use the JS prefix in front of Put and Delete operations
useJSPfx: js.opts.pre != defaultAPIPrefix,
useDirect: si.Config.AllowDirect,
}
return kv, nil
return mapStreamToKVS(js, si), nil
}

// DeleteKeyValue will delete this KeyValue store (JetStream stream).
Expand Down Expand Up @@ -980,3 +965,55 @@ func (kv *kvs) Status() (KeyValueStatus, error) {

return &KeyValueBucketStatus{nfo: nfo, bucket: kv.name}, nil
}

// KeyValueStoreNames is used to retrieve a list of key value store names
func (js *js) KeyValueStoreNames() <-chan string {
ch := make(chan string)
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
go func() {
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "KV_") {
continue
}
ch <- info.Config.Name
}
}
}()

return ch
}

// KeyValueStores is used to retrieve a list of key value stores
func (js *js) KeyValueStores() <-chan KeyValue {
ch := make(chan KeyValue)
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
go func() {
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "KV_") {
continue
}
ch <- mapStreamToKVS(js, info)
}
}
}()
return ch
}

func mapStreamToKVS(js *js, info *StreamInfo) *kvs {
bucket := strings.TrimPrefix(info.Config.Name, "KV_")
return &kvs{
name: bucket,
stream: info.Config.Name,
pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket),
js: js,
// Determine if we need to use the JS prefix in front of Put and Delete operations
useJSPfx: js.opts.pre != defaultAPIPrefix,
useDirect: info.Config.AllowDirect,
}
}
50 changes: 50 additions & 0 deletions test/kv_test.go
Expand Up @@ -813,3 +813,53 @@ func expectErr(t *testing.T, err error, expected ...error) {
}
t.Fatalf("Expected one of %+v, got '%v'", expected, err)
}

func TestListKeyValueStores(t *testing.T) {
tests := []struct {
name string
bucketsNum int
}{
{
name: "single page",
bucketsNum: 5,
},
{
name: "multi page",
bucketsNum: 1025,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
// create stream without the chunk subject, but with KV_ prefix
_, err := js.AddStream(&nats.StreamConfig{Name: "KV_FOO", Subjects: []string{"FOO.*"}})
expectOk(t, err)
// create stream with chunk subject, but without "KV_" prefix
_, err = js.AddStream(&nats.StreamConfig{Name: "FOO", Subjects: []string{"$KV.ABC.>"}})
expectOk(t, err)
for i := 0; i < test.bucketsNum; i++ {
_, err = js.CreateKeyValue(&nats.KeyValueConfig{Bucket: fmt.Sprintf("KVS_%d", i), MaxBytes: 1024})
expectOk(t, err)
}
names := make([]string, 0)
for name := range js.KeyValueStoreNames() {
names = append(names, name)
}
if len(names) != test.bucketsNum {
t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.bucketsNum, len(names))
}
infos := make([]nats.KeyValue, 0)
for info := range js.KeyValueStores() {
infos = append(infos, info)
}
if len(infos) != test.bucketsNum {
t.Fatalf("Invalid number of streams; want: %d; got: %d", test.bucketsNum, len(infos))
}
})
}
}
2 changes: 1 addition & 1 deletion test/object_test.go
Expand Up @@ -791,7 +791,7 @@ func TestObjectMaxBytes(t *testing.T) {
}
}

func TestBucketNames(t *testing.T) {
func TestListObjectStores(t *testing.T) {
tests := []struct {
name string
bucketsNum int
Expand Down

0 comments on commit 8e72eaa

Please sign in to comment.