Skip to content

Commit

Permalink
Added in Keys().
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Oct 6, 2021
1 parent 954e0fe commit f9d1049
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 2 deletions.
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.6.2-0.20211004195457-5ff751af998b
github.com/nats-io/nats-server/v2 v2.6.2-0.20211006191255-208146aade89
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
2 changes: 2 additions & 0 deletions go_test.sum
Expand Up @@ -21,6 +21,8 @@ github.com/nats-io/nats-server/v2 v2.6.2-0.20210930110517-c9eeab1d0df2 h1:cSVznb
github.com/nats-io/nats-server/v2 v2.6.2-0.20210930110517-c9eeab1d0df2/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80=
github.com/nats-io/nats-server/v2 v2.6.2-0.20211004195457-5ff751af998b h1:IP100E9oVfIkO/2ltiV6YMpiZNMT4pKUjDtFPd75+yE=
github.com/nats-io/nats-server/v2 v2.6.2-0.20211004195457-5ff751af998b/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80=
github.com/nats-io/nats-server/v2 v2.6.2-0.20211006191255-208146aade89 h1:NQHh3WWQ8Y6r35sQYUrfPEMObeKe0eZI3m0VMR26nb8=
github.com/nats-io/nats-server/v2 v2.6.2-0.20211006191255-208146aade89/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80=
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
8 changes: 8 additions & 0 deletions js.go
Expand Up @@ -2121,6 +2121,14 @@ func DeliverSubject(subject string) SubOpt {
})
}

// HeadersOnly() will instruct the consumer to only deleiver headers and no payloads.
func HeadersOnly() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.HeadersOnly = true
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down
49 changes: 48 additions & 1 deletion kv.go
Expand Up @@ -51,7 +51,9 @@ type KeyValue interface {
WatchAll(cb KeyValueUpdate) (*Subscription, error)
// Watch will invoke the callback for any keys that match keyPattern when they update.
Watch(keys string, cb KeyValueUpdate) (*Subscription, error)
// History will return all histroical values for the key.
// Keys() will return all keys
Keys() (<-chan string, error)
// History will return all historical values for the key.
History(key string) ([]KeyValueEntry, error)
// Bucket returns the current bucket name.
Bucket() string
Expand Down Expand Up @@ -130,6 +132,7 @@ var (
ErrKeyNotFound = errors.New("nats: key not found")
ErrKeyDeleted = errors.New("nats: key was deleted")
ErrHistoryToLarge = errors.New("nats: history limited to a max of 64")
ErrNoKeysFound = errors.New("nats: no keys found")
)

const (
Expand Down Expand Up @@ -443,6 +446,50 @@ func (kv *kvs) PurgeDeletes() error {
}
}

// Keys() will return all keys.
func (kv *kvs) Keys() (<-chan string, error) {
_, err := kv.js.GetLastMsg(kv.stream, AllKeys)
if err != nil {
if err == ErrMsgNotFound {
err = ErrNoKeysFound
}
return nil, err
}

keys := make(chan string, 32)
cb := func(m *Msg) {
if len(m.Subject) <= len(kv.pre) {
keys <- _EMPTY_
m.Sub.Unsubscribe()
return
}
subj := m.Subject[len(kv.pre):]
keys <- subj

tokens, err := getMetadataFields(m.Reply)
if err != nil {
keys <- _EMPTY_
m.Sub.Unsubscribe()
}
pending := tokens[ackNumPendingTokenPos]
if pending == kvNoPending {
keys <- _EMPTY_
m.Sub.Unsubscribe()
}
}

var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(AllKeys)

_, err = kv.js.Subscribe(b.String(), cb, OrderedConsumer(), DeliverLastPerSubject(), HeadersOnly())
if err != nil {
return nil, err
}

return keys, nil
}

// History will return all values for the key.
func (kv *kvs) History(key string) ([]KeyValueEntry, error) {
// Do quick check to make sure this is a legit key with history.
Expand Down
57 changes: 57 additions & 0 deletions test/kv_test.go
Expand Up @@ -16,6 +16,7 @@ package test
import (
"fmt"
"os"
"reflect"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -339,6 +340,62 @@ func TestKeyValueDeleteTombstones(t *testing.T) {
}
}

func TestKeyValueKeys(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdown(s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 2})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

_, err = kv.Keys()
expectErr(t, err, nats.ErrNoKeysFound)

// Put in a few names and ages.
put("name", "derek")
put("age", "22")
put("country", "US")
put("name", "ivan")
put("age", "33")
put("country", "US")
put("name", "rip")
put("age", "44")
put("country", "MT")

keys, err := kv.Keys()
expectOk(t, err)

kmap := make(map[string]struct{})
for key := range keys {
if key == "" { // End of list
break
}
if _, ok := kmap[key]; ok {
t.Fatalf("Already saw %q", key)
}
kmap[key] = struct{}{}
}
if len(kmap) != 3 {
t.Fatalf("Expected 3 total keys, got %d", len(kmap))
}
expected := map[string]struct{}{
"name": struct{}{},
"age": struct{}{},
"country": struct{}{},
}
if !reflect.DeepEqual(kmap, expected) {
t.Fatalf("Expected %+v but got %+v", expected, kmap)
}
}

// Helpers

func client(t *testing.T, s *server.Server) *nats.Conn {
Expand Down

0 comments on commit f9d1049

Please sign in to comment.