Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Listing KeyValue store and KeyValue store names #1088

Merged
merged 2 commits into from Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
97 changes: 76 additions & 21 deletions kv.go
Expand Up @@ -35,6 +35,10 @@ type KeyValueManager interface {
CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
// DeleteKeyValue will delete this KeyValue store (JetStream stream).
DeleteKeyValue(bucket string) error
// KeyValueStoreNames is used to retrieve a list of key value store names
KeyValueStoreNames() <-chan string
// KeyValueStores is used to retrieve a list of key value stores
KeyValueStores() <-chan KeyValue
}

// Notice: Experimental Preview
Expand Down Expand Up @@ -325,16 +329,7 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) {
return nil, ErrBadBucket
}

kv := &kvs{
name: bucket,
stream: stream,
pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket),
js: js,
// Determine if we need to use the JS prefix in front of Put and Delete operations
useJSPfx: js.opts.pre != defaultAPIPrefix,
useDirect: si.Config.AllowDirect,
}
return kv, nil
return mapStreamToKVS(js, si), nil
}

// CreateKeyValue will create a KeyValue store with the following configuration.
Expand Down Expand Up @@ -431,17 +426,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
return nil, err
}
}

kv := &kvs{
name: cfg.Bucket,
stream: scfg.Name,
pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket),
js: js,
// Determine if we need to use the JS prefix in front of Put and Delete operations
useJSPfx: js.opts.pre != defaultAPIPrefix,
useDirect: si.Config.AllowDirect,
}
return kv, nil
return mapStreamToKVS(js, si), nil
}

// DeleteKeyValue will delete this KeyValue store (JetStream stream).
Expand Down Expand Up @@ -980,3 +965,73 @@ func (kv *kvs) Status() (KeyValueStatus, error) {

return &KeyValueBucketStatus{nfo: nfo, bucket: kv.name}, nil
}

// KeyValueStoreNames is used to retrieve a list of key value store names
func (js *js) KeyValueStoreNames() <-chan string {
ch := make(chan string)
ctx, cancel := context.WithTimeout(context.Background(), defaultRequestWait)
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if this subject will work with the JS Prefix? Or whether it needs something like js.apiSubj() to make it so?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the filter subject for STREAM.LIST api and points to $KV.%s.>. AFAIK, CreateKeyValue() creates a stream with interest in this subject (%s is the name), so there is no need for $JS prefix. Or do I misunderstand it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh right, ok then there is no need for the prefix there.

l.js.opts.ctx = ctx
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "KV_") {
continue
}
select {
case ch <- info.Config.Name:
case <-ctx.Done():
return
}
}
}
}()

return ch
}

// KeyValueStores is used to retrieve a list of key value stores
func (js *js) KeyValueStores() <-chan KeyValue {
ch := make(chan KeyValue)
ctx, cancel := context.WithTimeout(context.Background(), defaultRequestWait)
l := &streamLister{js: js}
l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
l.js.opts.ctx = ctx
go func() {
if cancel != nil {
defer cancel()
}
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "KV_") {
continue
}
select {
case ch <- mapStreamToKVS(js, info):
case <-ctx.Done():
return
}
}
}
}()
return ch
}

func mapStreamToKVS(js *js, info *StreamInfo) *kvs {
bucket := strings.TrimPrefix(info.Config.Name, "KV_")
return &kvs{
name: bucket,
stream: info.Config.Name,
pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket),
js: js,
// Determine if we need to use the JS prefix in front of Put and Delete operations
useJSPfx: js.opts.pre != defaultAPIPrefix,
useDirect: info.Config.AllowDirect,
}
}
50 changes: 50 additions & 0 deletions test/kv_test.go
Expand Up @@ -813,3 +813,53 @@ func expectErr(t *testing.T, err error, expected ...error) {
}
t.Fatalf("Expected one of %+v, got '%v'", expected, err)
}

func TestListKeyValueStores(t *testing.T) {
tests := []struct {
name string
bucketsNum int
}{
{
name: "single page",
bucketsNum: 5,
},
{
name: "multi page",
bucketsNum: 1025,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
// create stream without the chunk subject, but with KV_ prefix
_, err := js.AddStream(&nats.StreamConfig{Name: "KV_FOO", Subjects: []string{"FOO.*"}})
expectOk(t, err)
// create stream with chunk subject, but without "KV_" prefix
_, err = js.AddStream(&nats.StreamConfig{Name: "FOO", Subjects: []string{"$KV.ABC.>"}})
expectOk(t, err)
for i := 0; i < test.bucketsNum; i++ {
_, err = js.CreateKeyValue(&nats.KeyValueConfig{Bucket: fmt.Sprintf("KVS_%d", i), MaxBytes: 1024})
expectOk(t, err)
}
names := make([]string, 0)
for name := range js.KeyValueStoreNames() {
names = append(names, name)
}
if len(names) != test.bucketsNum {
t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.bucketsNum, len(names))
}
infos := make([]nats.KeyValue, 0)
for info := range js.KeyValueStores() {
infos = append(infos, info)
}
if len(infos) != test.bucketsNum {
t.Fatalf("Invalid number of streams; want: %d; got: %d", test.bucketsNum, len(infos))
}
})
}
}
2 changes: 1 addition & 1 deletion test/object_test.go
Expand Up @@ -791,7 +791,7 @@ func TestObjectMaxBytes(t *testing.T) {
}
}

func TestBucketNames(t *testing.T) {
func TestListObjectStores(t *testing.T) {
tests := []struct {
name string
bucketsNum int
Expand Down