From da7703c949e40b81421ff6a79198ff0bb14cdbed Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 6 Oct 2021 15:06:51 -0700 Subject: [PATCH] Added in Keys(). Signed-off-by: Derek Collison --- go_test.mod | 2 +- go_test.sum | 2 ++ js.go | 8 +++++++ jsm.go | 2 +- kv.go | 49 +++++++++++++++++++++++++++++++++++++++- test/kv_test.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 119 insertions(+), 4 deletions(-) diff --git a/go_test.mod b/go_test.mod index 438c9605d..e747cd2af 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.6.2-0.20211004195457-5ff751af998b + github.com/nats-io/nats-server/v2 v2.6.2-0.20211006191255-208146aade89 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 5019caeab..5834f3400 100644 --- a/go_test.sum +++ b/go_test.sum @@ -21,6 +21,8 @@ github.com/nats-io/nats-server/v2 v2.6.2-0.20210930110517-c9eeab1d0df2 h1:cSVznb github.com/nats-io/nats-server/v2 v2.6.2-0.20210930110517-c9eeab1d0df2/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80= github.com/nats-io/nats-server/v2 v2.6.2-0.20211004195457-5ff751af998b h1:IP100E9oVfIkO/2ltiV6YMpiZNMT4pKUjDtFPd75+yE= github.com/nats-io/nats-server/v2 v2.6.2-0.20211004195457-5ff751af998b/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80= +github.com/nats-io/nats-server/v2 v2.6.2-0.20211006191255-208146aade89 h1:NQHh3WWQ8Y6r35sQYUrfPEMObeKe0eZI3m0VMR26nb8= +github.com/nats-io/nats-server/v2 v2.6.2-0.20211006191255-208146aade89/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80= github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/js.go b/js.go index 61d5905b5..9d67f5aad 100644 --- a/js.go +++ b/js.go @@ -2121,6 +2121,14 @@ func DeliverSubject(subject string) SubOpt { }) } +// HeadersOnly() will instruct the consumer to only deleiver headers and no payloads. +func HeadersOnly() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.HeadersOnly = true + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. diff --git a/jsm.go b/jsm.go index 6f98747c2..48537f2ef 100644 --- a/jsm.go +++ b/jsm.go @@ -96,7 +96,7 @@ type StreamConfig struct { Sealed bool `json:"sealed,omitempty"` DenyDelete bool `json:"deny_delete,omitempty"` DenyPurge bool `json:"deny_purge,omitempty"` - AllowRollup bool `json:"rollup_hdrs,omitempty"` + AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` } // Placement is used to guide placement of streams in clustered JetStream. diff --git a/kv.go b/kv.go index 8277a8565..f81b66dd8 100644 --- a/kv.go +++ b/kv.go @@ -51,7 +51,9 @@ type KeyValue interface { WatchAll(cb KeyValueUpdate) (*Subscription, error) // Watch will invoke the callback for any keys that match keyPattern when they update. Watch(keys string, cb KeyValueUpdate) (*Subscription, error) - // History will return all histroical values for the key. + // Keys() will return all keys + Keys() (<-chan string, error) + // History will return all historical values for the key. History(key string) ([]KeyValueEntry, error) // Bucket returns the current bucket name. Bucket() string @@ -130,6 +132,7 @@ var ( ErrKeyNotFound = errors.New("nats: key not found") ErrKeyDeleted = errors.New("nats: key was deleted") ErrHistoryToLarge = errors.New("nats: history limited to a max of 64") + ErrNoKeysFound = errors.New("nats: no keys found") ) const ( @@ -443,6 +446,50 @@ func (kv *kvs) PurgeDeletes() error { } } +// Keys() will return all keys. +func (kv *kvs) Keys() (<-chan string, error) { + _, err := kv.js.GetLastMsg(kv.stream, AllKeys) + if err != nil { + if err == ErrMsgNotFound { + err = ErrNoKeysFound + } + return nil, err + } + + keys := make(chan string, 32) + cb := func(m *Msg) { + if len(m.Subject) <= len(kv.pre) { + keys <- _EMPTY_ + m.Sub.Unsubscribe() + return + } + subj := m.Subject[len(kv.pre):] + keys <- subj + + tokens, err := getMetadataFields(m.Reply) + if err != nil { + keys <- _EMPTY_ + m.Sub.Unsubscribe() + } + pending := tokens[ackNumPendingTokenPos] + if pending == kvNoPending { + keys <- _EMPTY_ + m.Sub.Unsubscribe() + } + } + + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(AllKeys) + + _, err = kv.js.Subscribe(b.String(), cb, OrderedConsumer(), DeliverLastPerSubject(), HeadersOnly()) + if err != nil { + return nil, err + } + + return keys, nil +} + // History will return all values for the key. func (kv *kvs) History(key string) ([]KeyValueEntry, error) { // Do quick check to make sure this is a legit key with history. diff --git a/test/kv_test.go b/test/kv_test.go index 9a49758a0..01dc69cb5 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -16,6 +16,7 @@ package test import ( "fmt" "os" + "reflect" "strconv" "strings" "testing" @@ -284,7 +285,8 @@ func TestKeyValueDeleteVsPurge(t *testing.T) { if len(entries) != 4 { t.Fatalf("Expected 4 entries for age after delete, got %d", len(entries)) } - kv.Purge("name") + err = kv.Purge("name") + expectOk(t, err) // Check marker e, err := kv.Get("name") expectErr(t, err, nats.ErrKeyDeleted) @@ -339,6 +341,62 @@ func TestKeyValueDeleteTombstones(t *testing.T) { } } +func TestKeyValueKeys(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdown(s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 2}) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(key, []byte(value)) + expectOk(t, err) + } + + _, err = kv.Keys() + expectErr(t, err, nats.ErrNoKeysFound) + + // Put in a few names and ages. + put("name", "derek") + put("age", "22") + put("country", "US") + put("name", "ivan") + put("age", "33") + put("country", "US") + put("name", "rip") + put("age", "44") + put("country", "MT") + + keys, err := kv.Keys() + expectOk(t, err) + + kmap := make(map[string]struct{}) + for key := range keys { + if key == "" { // End of list + break + } + if _, ok := kmap[key]; ok { + t.Fatalf("Already saw %q", key) + } + kmap[key] = struct{}{} + } + if len(kmap) != 3 { + t.Fatalf("Expected 3 total keys, got %d", len(kmap)) + } + expected := map[string]struct{}{ + "name": struct{}{}, + "age": struct{}{}, + "country": struct{}{}, + } + if !reflect.DeepEqual(kmap, expected) { + t.Fatalf("Expected %+v but got %+v", expected, kmap) + } +} + // Helpers func client(t *testing.T, s *server.Server) *nats.Conn {