Skip to content

Commit

Permalink
Expose nats.Context option for nats.KeyWatcher (#904)
Browse files Browse the repository at this point in the history
* Expose KeyWatcher context to facilitate KeyValue interface extension

Signed-off-by: Matthew DeVenny <matt@boxboat.com>

* Expose KeyWatcher context to facilitate KeyValue interface extension

Signed-off-by: Matthew DeVenny <matt@boxboat.com>

* Rename GetContext to Context

Signed-off-by: Matthew DeVenny <matt@boxboat.com>
  • Loading branch information
matthewdevenny committed Mar 18, 2022
1 parent 4fef66c commit e0e03e3
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion kv.go
Expand Up @@ -94,6 +94,8 @@ type KeyValueStatus interface {

// KeyWatcher is what is returned when doing a watch.
type KeyWatcher interface {
// Context returns watcher context optionally provided by nats.Context option.
Context() context.Context
// Updates returns a channel to read any updates to entries.
Updates() <-chan KeyValueEntry
// Stop will stop this watcher.
Expand Down Expand Up @@ -743,6 +745,15 @@ type watcher struct {
initDone bool
initPending uint64
received uint64
ctx context.Context
}

// Context returns the context for the watcher if set.
func (w *watcher) Context() context.Context {
if w == nil {
return nil
}
return w.ctx
}

// Updates returns the interior channel.
Expand Down Expand Up @@ -785,7 +796,7 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
keys = b.String()

// We will block below on placing items on the chan. That is by design.
w := &watcher{updates: make(chan KeyValueEntry, 256)}
w := &watcher{updates: make(chan KeyValueEntry, 256), ctx: o.ctx}

update := func(m *Msg) {
tokens, err := getMetadataFields(m.Reply)
Expand Down

0 comments on commit e0e03e3

Please sign in to comment.