From 4615d64aec1cd1b6b1b7eb5a80a31a4248a86f30 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 7 Oct 2021 11:32:21 -0700 Subject: [PATCH] Updates to KV based on morning mtg Signed-off-by: Derek Collison --- kv.go | 313 +++++++++++++++++++++++------------------------- test/kv_test.go | 31 ++--- 2 files changed, 158 insertions(+), 186 deletions(-) diff --git a/kv.go b/kv.go index 46e5bb1ab..7c3e78268 100644 --- a/kv.go +++ b/kv.go @@ -32,6 +32,9 @@ type KeyValueManager interface { DeleteKeyValue(bucket string) error } +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. type KeyValue interface { // Get returns the latest value for the key. Get(key string) (entry KeyValueEntry, err error) @@ -45,18 +48,67 @@ type KeyValue interface { Delete(key string) error // Purge will place a delete marker and remove all previous revisions. Purge(key string) error - // PurgeDeletes will remove all current delete markers. - PurgeDeletes() error + // Watch for any updates to keys that match the keys argument which could include wildcards. + // Watch will send a nil entry when it has received all initial values. + Watch(keys string, opts ...WatchOpt) (Watcher, error) // WatchAll will invoke the callback for all updates. - WatchAll(cb KeyValueUpdate) (*Subscription, error) - // Watch will invoke the callback for any keys that match keyPattern when they update. - Watch(keys string, cb KeyValueUpdate) (*Subscription, error) - // Keys() will return all keys - Keys() (<-chan string, error) + WatchAll(opts ...WatchOpt) (Watcher, error) + // Keys will return all keys. + Keys(opts ...WatchOpt) ([]string, error) // History will return all historical values for the key. - History(key string) ([]KeyValueEntry, error) + History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) // Bucket returns the current bucket name. Bucket() string + // PurgeDeletes will remove all current delete markers. + PurgeDeletes(opts ...WatchOpt) error +} + +// Watch is what is returned when doing a watch. +type Watcher interface { + // Updates returns a channel to read any updates to entries. + Updates() <-chan KeyValueEntry + // Stop() will stop this watcher. + Stop() error +} + +type WatchOpt interface { + configureWatcher(opts *watchOpts) error +} + +// For nats.Context() support. +func (ctx ContextOpt) configureWatcher(opts *watchOpts) error { + opts.ctx = ctx + return nil +} + +type watchOpts struct { + ctx context.Context + // Do not send delete markers to the update channel. + ignoreDeletes bool + // Include all history per subject, not just last one. + includeHistroy bool +} + +type watchOptFn func(opts *watchOpts) error + +func (opt watchOptFn) configureWatcher(opts *watchOpts) error { + return opt(opts) +} + +// IncludeHistory instructs the watche to include historical values as well. +func IncludeHistory() WatchOpt { + return watchOptFn(func(opts *watchOpts) error { + opts.includeHistroy = true + return nil + }) +} + +// IgnoreDeletes will have the watcher not pass delete markers. +func IgnoreDeletes() WatchOpt { + return watchOptFn(func(opts *watchOpts) error { + opts.ignoreDeletes = true + return nil + }) } // KeyValueConfig is for configuring a KeyValue store. @@ -119,9 +171,6 @@ type KeyValueEntry interface { Operation() KeyValueOp } -// KeyValueUpdate is the callback handler for KeyValueEntry updates. -type KeyValueUpdate func(v KeyValueEntry) - // Errors var ( ErrKeyValueConfigRequired = errors.New("nats: config required") @@ -403,193 +452,113 @@ func (kv *kvs) delete(key string, purge bool) error { // PurgeDeletes will remove all current delete markers. // This is a maintenance option if there is a larger buildup of delete markers. -func (kv *kvs) PurgeDeletes() error { - o, cancel, err := getJSContextOpts(kv.js.opts) +func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error { + watcher, err := kv.WatchAll(opts...) if err != nil { return err } - if cancel != nil { - defer cancel() - } + defer watcher.Stop() - ctx := o.ctx - if ctx == nil { - ctx, cancel = context.WithTimeout(context.Background(), o.wait) - defer cancel() - } - - done := make(chan error, 1) - sub, err := kv.WatchAll(func(v KeyValueEntry) { - if v == nil { - done <- nil - return + for entry := range watcher.Updates() { + if entry == nil { + break } - var b strings.Builder - b.WriteString(kv.pre) - b.WriteString(v.Key()) - err := kv.js.purgeStream(kv.stream, &streamPurgeRequest{Subject: b.String()}) - if err != nil { - done <- err + if op := entry.Operation(); op == KeyValueDelete || op == KeyValuePurge { + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(entry.Key()) + err := kv.js.purgeStream(kv.stream, &streamPurgeRequest{Subject: b.String()}) + if err != nil { + return err + } } - }) - if err != nil { - return err - } - defer sub.Unsubscribe() - - // Wait on done or ctx/timeout. - select { - case err := <-done: - return err - case <-ctx.Done(): - return ctx.Err() } + return nil } // Keys() will return all keys. -func (kv *kvs) Keys() (<-chan string, error) { - _, err := kv.js.GetLastMsg(kv.stream, AllKeys) +func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) { + opts = append(opts, IgnoreDeletes()) + watcher, err := kv.WatchAll(opts...) if err != nil { - if err == ErrMsgNotFound { - err = ErrNoKeysFound - } return nil, err } + defer watcher.Stop() - keys := make(chan string, 32) - cb := func(m *Msg) { - if len(m.Subject) <= len(kv.pre) { - keys <- _EMPTY_ - m.Sub.Unsubscribe() - return - } - tokens, err := getMetadataFields(m.Reply) - if err != nil { - keys <- _EMPTY_ - m.Sub.Unsubscribe() - } - pending := tokens[ackNumPendingTokenPos] - - // Check for purged keys, etc. - if len(m.Header) > 0 { - // Ignore deleted or purged keys. - if op := m.Header.Get(kvop); op == kvdel || op == kvpurge { - if pending == kvNoPending { - keys <- _EMPTY_ - m.Sub.Unsubscribe() - } - return - } - } - // Grab correct key by stripping prefix. - keys <- m.Subject[len(kv.pre):] - if pending == kvNoPending { - keys <- _EMPTY_ - m.Sub.Unsubscribe() + var keys []string + for entry := range watcher.Updates() { + if entry == nil { + break } + keys = append(keys, entry.Key()) } - - var b strings.Builder - b.WriteString(kv.pre) - b.WriteString(AllKeys) - - _, err = kv.js.Subscribe(b.String(), cb, OrderedConsumer(), DeliverLastPerSubject(), HeadersOnly()) - if err != nil { - return nil, err + if len(keys) == 0 { + return nil, ErrNoKeysFound } - return keys, nil } // History will return all values for the key. -func (kv *kvs) History(key string) ([]KeyValueEntry, error) { - // Do quick check to make sure this is a legit key with history. - if _, err := kv.Get(key); err != nil && err != ErrKeyDeleted { - return nil, err - } - - o, cancel, err := getJSContextOpts(kv.js.opts) +func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) { + opts = append(opts, IncludeHistory()) + watcher, err := kv.Watch(key, opts...) if err != nil { return nil, err } - if cancel != nil { - defer cancel() - } + defer watcher.Stop() - ctx := o.ctx - if ctx == nil { - ctx, cancel = context.WithTimeout(context.Background(), o.wait) - defer cancel() - } - - var vals []KeyValueEntry - done := make(chan error, 1) - cb := func(m *Msg) { - tokens, err := getMetadataFields(m.Reply) - if err != nil { - done <- err - } else { - if len(m.Subject) <= len(kv.pre) { - done <- ErrBadSubject - return - } - subj := m.Subject[len(kv.pre):] - pending := tokens[ackNumPendingTokenPos] - entry := &kve{ - bucket: kv.name, - key: subj, - value: m.Data, - revision: uint64(parseNum(tokens[ackStreamSeqTokenPos])), - created: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])), - delta: uint64(parseNum(pending)), - } - if len(m.Header) > 0 { - switch m.Header.Get(kvop) { - case kvdel: - entry.op = KeyValueDelete - case kvpurge: - entry.op = KeyValuePurge - } - } - vals = append(vals, entry) - - if pending == kvNoPending { - done <- nil - } + var entries []KeyValueEntry + for entry := range watcher.Updates() { + if entry == nil { + break } + entries = append(entries, entry) } + if len(entries) == 0 { + return nil, ErrKeyNotFound + } + return entries, nil +} - var b strings.Builder - b.WriteString(kv.pre) - b.WriteString(key) +// Implementation for Watch +type watcher struct { + updates chan KeyValueEntry + sub *Subscription +} - // Used ordered consumer to deliver results. - sub, err := kv.js.Subscribe(b.String(), cb, OrderedConsumer()) - if err != nil { - return nil, err +// Updates returns the interior channel. +func (w *watcher) Updates() <-chan KeyValueEntry { + if w == nil { + return nil } - defer sub.Unsubscribe() + return w.updates +} - // Wait on done or ctx/timeout. - select { - case err := <-done: - if err != nil { - return nil, err - } - return vals, nil - case <-ctx.Done(): - return nil, ctx.Err() +// Stop will unsubscribe from the watcher. +func (w *watcher) Stop() error { + if w == nil { + return nil } + return w.sub.Unsubscribe() } // WatchAll watches all keys. -func (kv *kvs) WatchAll(cb KeyValueUpdate) (*Subscription, error) { - return kv.Watch(AllKeys, cb) +func (kv *kvs) WatchAll(opts ...WatchOpt) (Watcher, error) { + return kv.Watch(AllKeys, opts...) } // Watch will fire the callback when a key that matches the keys pattern is updated. // keys needs to be a valid NATS subject. -func (kv *kvs) Watch(keys string, cb KeyValueUpdate) (*Subscription, error) { +func (kv *kvs) Watch(keys string, opts ...WatchOpt) (Watcher, error) { + var o watchOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureWatcher(&o); err != nil { + return nil, err + } + } + } + var initDoneMarker bool // Could be a pattern so don't check for validity as we normally do. @@ -598,6 +567,8 @@ func (kv *kvs) Watch(keys string, cb KeyValueUpdate) (*Subscription, error) { b.WriteString(keys) keys = b.String() + w := &watcher{updates: make(chan KeyValueEntry, 32)} + update := func(m *Msg) { tokens, err := getMetadataFields(m.Reply) if err != nil { @@ -618,7 +589,7 @@ func (kv *kvs) Watch(keys string, cb KeyValueUpdate) (*Subscription, error) { } } delta := uint64(parseNum(tokens[ackNumPendingTokenPos])) - cb(&kve{ + entry := &kve{ bucket: kv.name, key: subj, value: m.Data, @@ -626,11 +597,14 @@ func (kv *kvs) Watch(keys string, cb KeyValueUpdate) (*Subscription, error) { created: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])), delta: delta, op: op, - }) - + } + if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) { + w.updates <- entry + } + // Check if done initial values. if !initDoneMarker && delta == 0 { initDoneMarker = true - cb(nil) + w.updates <- nil } } @@ -638,11 +612,20 @@ func (kv *kvs) Watch(keys string, cb KeyValueUpdate) (*Subscription, error) { _, err := kv.js.GetLastMsg(kv.stream, keys) if err == ErrMsgNotFound { initDoneMarker = true - cb(nil) + w.updates <- nil } // Used ordered consumer to deliver results. - return kv.js.Subscribe(keys, update, OrderedConsumer(), DeliverLastPerSubject()) + subOpts := []SubOpt{OrderedConsumer()} + if !o.includeHistroy { + subOpts = append(subOpts, DeliverLastPerSubject()) + } + sub, err := kv.js.Subscribe(keys, update, subOpts...) + if err != nil { + return nil, err + } + w.sub = sub + return w, nil } // Bucket returns the current bucket name (JetStream stream). diff --git a/test/kv_test.go b/test/kv_test.go index 7d9ffe827..01934ba48 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -128,17 +128,14 @@ func TestKeyValueWatch(t *testing.T) { kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) expectOk(t, err) - updates := make(chan nats.KeyValueEntry, 32) - sub, err := kv.WatchAll(func(v nats.KeyValueEntry) { - updates <- v - }) + watcher, err := kv.WatchAll() expectOk(t, err) - defer sub.Unsubscribe() + defer watcher.Stop() expectUpdate := func(key, value string, revision uint64) { t.Helper() select { - case v := <-updates: + case v := <-watcher.Updates(): if v.Key() != key || string(v.Value()) != value || v.Revision() != revision { t.Fatalf("Did not get expected: %+v vs %q %q %d", v, key, value, revision) } @@ -149,7 +146,7 @@ func TestKeyValueWatch(t *testing.T) { expectDelete := func(key string, revision uint64) { t.Helper() select { - case v := <-updates: + case v := <-watcher.Updates(): if v.Operation() != nats.KeyValueDelete { t.Fatalf("Expected a delete operation but got %+v", v) } @@ -163,7 +160,7 @@ func TestKeyValueWatch(t *testing.T) { expectInitDone := func() { t.Helper() select { - case v := <-updates: + case v := <-watcher.Updates(): if v != nil { t.Fatalf("Did not get expected: %+v", v) } @@ -189,7 +186,7 @@ func TestKeyValueWatch(t *testing.T) { expectDelete("age", 6) // Stop first watcher. - sub.Unsubscribe() + watcher.Stop() // Now try wildcard matching and make sure we only get last value when starting. kv.Put("t.name", []byte("rip")) @@ -197,11 +194,9 @@ func TestKeyValueWatch(t *testing.T) { kv.Put("t.age", []byte("22")) kv.Put("t.age", []byte("44")) - sub, err = kv.Watch("t.*", func(v nats.KeyValueEntry) { - updates <- v - }) + watcher, err = kv.Watch("t.*") expectOk(t, err) - defer sub.Unsubscribe() + defer watcher.Stop() expectUpdate("t.name", "ik", 8) expectUpdate("t.age", "44", 10) @@ -375,10 +370,7 @@ func TestKeyValueKeys(t *testing.T) { expectOk(t, err) kmap := make(map[string]struct{}) - for key := range keys { - if key == "" { // End of list - break - } + for _, key := range keys { if _, ok := kmap[key]; ok { t.Fatalf("Already saw %q", key) } @@ -405,10 +397,7 @@ func TestKeyValueKeys(t *testing.T) { expectOk(t, err) kmap = make(map[string]struct{}) - for key := range keys { - if key == "" { // End of list - break - } + for _, key := range keys { if _, ok := kmap[key]; ok { t.Fatalf("Already saw %q", key) }