Skip to content

Commit

Permalink
Added in Keys().
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Oct 6, 2021
1 parent 954e0fe commit e2c5eb5
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 1 deletion.
49 changes: 48 additions & 1 deletion kv.go
Expand Up @@ -51,7 +51,9 @@ type KeyValue interface {
WatchAll(cb KeyValueUpdate) (*Subscription, error)
// Watch will invoke the callback for any keys that match keyPattern when they update.
Watch(keys string, cb KeyValueUpdate) (*Subscription, error)
// History will return all histroical values for the key.
// Keys() will return all keys
Keys() (<-chan string, error)
// History will return all historical values for the key.
History(key string) ([]KeyValueEntry, error)
// Bucket returns the current bucket name.
Bucket() string
Expand Down Expand Up @@ -130,6 +132,7 @@ var (
ErrKeyNotFound = errors.New("nats: key not found")
ErrKeyDeleted = errors.New("nats: key was deleted")
ErrHistoryToLarge = errors.New("nats: history limited to a max of 64")
ErrNoKeysFound = errors.New("nats: no keys found")
)

const (
Expand Down Expand Up @@ -443,6 +446,50 @@ func (kv *kvs) PurgeDeletes() error {
}
}

// Keys() will return all keys.
func (kv *kvs) Keys() (<-chan string, error) {
_, err := kv.js.GetLastMsg(kv.stream, AllKeys)
if err != nil {
if err == ErrMsgNotFound {
err = ErrNoKeysFound
}
return nil, err
}

keys := make(chan string, 32)
cb := func(m *Msg) {
if len(m.Subject) <= len(kv.pre) {
keys <- _EMPTY_
m.Sub.Unsubscribe()
return
}
subj := m.Subject[len(kv.pre):]
keys <- subj

tokens, err := getMetadataFields(m.Reply)
if err != nil {
keys <- _EMPTY_
m.Sub.Unsubscribe()
}
pending := tokens[ackNumPendingTokenPos]
if pending == kvNoPending {
keys <- _EMPTY_
m.Sub.Unsubscribe()
}
}

var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(AllKeys)

_, err = kv.js.Subscribe(b.String(), cb, OrderedConsumer(), DeliverLastPerSubject())
if err != nil {
return nil, err
}

return keys, nil
}

// History will return all values for the key.
func (kv *kvs) History(key string) ([]KeyValueEntry, error) {
// Do quick check to make sure this is a legit key with history.
Expand Down
57 changes: 57 additions & 0 deletions test/kv_test.go
Expand Up @@ -16,6 +16,7 @@ package test
import (
"fmt"
"os"
"reflect"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -339,6 +340,62 @@ func TestKeyValueDeleteTombstones(t *testing.T) {
}
}

func TestKeyValueKeys(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdown(s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 2})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

_, err = kv.Keys()
expectErr(t, err, nats.ErrNoKeysFound)

// Put in a few names and ages.
put("name", "derek")
put("age", "22")
put("country", "US")
put("name", "ivan")
put("age", "33")
put("country", "US")
put("name", "rip")
put("age", "44")
put("country", "MT")

keys, err := kv.Keys()
expectOk(t, err)

kmap := make(map[string]struct{})
for key := range keys {
if key == "" { // End of list
break
}
if _, ok := kmap[key]; ok {
t.Fatalf("Already saw %q", key)
}
kmap[key] = struct{}{}
}
if len(kmap) != 3 {
t.Fatalf("Expected 3 total keys, got %d", len(kmap))
}
expected := map[string]struct{}{
"name": struct{}{},
"age": struct{}{},
"country": struct{}{},
}
if !reflect.DeepEqual(kmap, expected) {
t.Fatalf("Expected %+v but got %+v", expected, kmap)
}
}

// Helpers

func client(t *testing.T, s *server.Server) *nats.Conn {
Expand Down

0 comments on commit e2c5eb5

Please sign in to comment.