Skip to content

Commit

Permalink
Merge pull request #900 from nats-io/watcher_updates
Browse files Browse the repository at this point in the history
Updates to KV Watcher.
  • Loading branch information
derekcollison committed Feb 8, 2022
2 parents bf1b005 + b6d9b13 commit 6cbe827
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 31 deletions.
13 changes: 11 additions & 2 deletions go_test.mod
@@ -1,11 +1,20 @@
module github.com/nats-io/nats.go

go 1.16
go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c
github.com/nats-io/nats-server/v2 v2.7.2
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)

require (
github.com/klauspost/compress v1.13.4 // indirect
github.com/minio/highwayhash v1.0.1 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
)
4 changes: 2 additions & 2 deletions go_test.sum
Expand Up @@ -17,8 +17,8 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c h1:TfLMCHvaj2YSNrgiEWQiXA344lWqPmX3xOLtZj/ywlA=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.2 h1:+LEN8m0+jdCkiGc884WnDuxR+qj80/5arj+szKuRpRI=
github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
18 changes: 15 additions & 3 deletions js.go
@@ -1,4 +1,4 @@
// Copyright 2020-2021 The NATS Authors
// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -919,6 +919,9 @@ type ConsumerConfig struct {

// Ephemeral inactivity threshold.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

// Internal Use
Direct bool `json:"direct,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
Expand Down Expand Up @@ -968,6 +971,7 @@ type jsSub struct {
consumer string
stream string
deliver string
pending uint64
pull bool
dc bool // Delete JS consumer

Expand All @@ -977,6 +981,9 @@ type jsSub struct {
sseq uint64
ccreq *createConsumerRequest

// Optional watcher
w *watcher

// Heartbeats and Flow Control handling from push consumers.
hbc *time.Timer
hbi time.Duration
Expand Down Expand Up @@ -1311,6 +1318,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
o.cfg.AckPolicy = AckNonePolicy
o.cfg.MaxDeliver = 1
o.cfg.AckWait = 22 * time.Hour // Just set to something known, not utilized.
o.cfg.Direct = true
if !hasHeartbeats {
o.cfg.Heartbeat = orderedHeartbeatsInterval
}
Expand Down Expand Up @@ -1517,6 +1525,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if err != nil {
return nil, err
}
jsi.pending = info.NumPending

if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if isSync {
Expand All @@ -1533,6 +1543,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat

// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
Expand All @@ -1551,6 +1562,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
sub.mu.Lock()
sub.jsi.dc = true
jsi.pending = info.NumPending
// If this is an ephemeral, we did not have a consumer name, we get it from the info
// after the AddConsumer returns.
if consumer == _EMPTY_ {
Expand Down Expand Up @@ -1859,7 +1871,7 @@ func (sub *Subscription) activityCheck() {
}

active := jsi.active
jsi.hbc.Reset(jsi.hbi)
jsi.hbc.Reset(jsi.hbi * hbcThresh)
jsi.active = false
nc := sub.conn
sub.mu.Unlock()
Expand Down Expand Up @@ -1887,7 +1899,7 @@ func (sub *Subscription) scheduleHeartbeatCheck() {
if jsi.hbc == nil {
jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck)
} else {
jsi.hbc.Reset(jsi.hbi)
jsi.hbc.Reset(jsi.hbi * hbcThresh)
}
}

Expand Down
63 changes: 41 additions & 22 deletions kv.go
@@ -1,4 +1,4 @@
// Copyright 2021 The NATS Authors
// Copyright 2021-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -306,6 +306,11 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
DenyDelete: true,
}

// If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below.
if js.nc.serverMinVersion(2, 7, 2) {
scfg.Discard = DiscardNew
}

if _, err := js.AddStream(scfg); err != nil {
return nil, err
}
Expand Down Expand Up @@ -626,14 +631,16 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}

var initDoneMarker bool
initPending, received := uint64(0), uint64(0)

// Could be a pattern so don't check for validity as we normally do.
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(keys)
keys = b.String()

w := &watcher{updates: make(chan KeyValueEntry, 32)}
// We will block below on placing items on the chan. That is by design.
w := &watcher{updates: make(chan KeyValueEntry, 256)}

update := func(m *Msg) {
tokens, err := getMetadataFields(m.Reply)
Expand All @@ -655,32 +662,32 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}
}
delta := uint64(parseNum(tokens[ackNumPendingTokenPos]))
entry := &kve{
bucket: kv.name,
key: subj,
value: m.Data,
revision: uint64(parseNum(tokens[ackStreamSeqTokenPos])),
created: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])),
delta: delta,
op: op,
}
if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) {
entry := &kve{
bucket: kv.name,
key: subj,
value: m.Data,
revision: uint64(parseNum(tokens[ackStreamSeqTokenPos])),
created: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])),
delta: delta,
op: op,
}
w.updates <- entry
}
// Check if done initial values.
if !initDoneMarker && delta == 0 {
initDoneMarker = true
w.updates <- nil
// Check if done and initial values.
if !initDoneMarker {
received++
// We set this on the first trip through..
if initPending == 0 {
initPending = delta
}
if received > initPending || delta == 0 {
initDoneMarker = true
w.updates <- nil
}
}
}

// Check if we have anything pending.
_, err := kv.js.GetLastMsg(kv.stream, keys)
if err == ErrMsgNotFound {
initDoneMarker = true
w.updates <- nil
}

// Used ordered consumer to deliver results.
subOpts := []SubOpt{OrderedConsumer()}
if !o.includeHistory {
Expand All @@ -696,6 +703,18 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
if err != nil {
return nil, err
}
// Track watcher.
sub.jsi.w = w
// Set us up to close when the waitForMessages func returns.
sub.pDone = func() {
close(w.updates)
}
// Check on pending count.
if sub.jsi.pending == 0 {
initDoneMarker = true
w.updates <- nil
}

w.sub = sub
return w, nil
}
Expand Down
11 changes: 10 additions & 1 deletion nats.go
Expand Up @@ -159,6 +159,7 @@ var (
ErrConsumerNotActive = errors.New("nats: consumer not active")
ErrMsgNotFound = errors.New("nats: message not found")
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
)

func init() {
Expand Down Expand Up @@ -585,6 +586,7 @@ type Subscription struct {
pHead *Msg
pTail *Msg
pCond *sync.Cond
pDone func()

// Pending stats, async subscriptions, high-speed etc.
pMsgs int
Expand Down Expand Up @@ -2695,7 +2697,13 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
}
s.pHead = m.next
}
// Now check for pDone
done := s.pDone
s.mu.Unlock()

if done != nil {
done()
}
}

// Used for debugging and simulating loss for certain tests.
Expand Down Expand Up @@ -3927,7 +3935,8 @@ func (nc *Conn) removeSub(s *Subscription) {
s.mch = nil

// If JS subscription then stop HB timer.
if jsi := s.jsi; jsi != nil {
jsi := s.jsi
if jsi != nil {
if jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
Expand Down
83 changes: 82 additions & 1 deletion test/kv_test.go
@@ -1,4 +1,4 @@
// Copyright 2022 The NATS Authors
// Copyright 2021-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -258,6 +259,44 @@ func TestKeyValueWatchContext(t *testing.T) {
}
}

func TestKeyValueWatchContextUpdates(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCHCTX"})
expectOk(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

watcher, err := kv.WatchAll(nats.Context(ctx))
expectOk(t, err)
defer watcher.Stop()

// Pull the initial state done marker which is nil.
select {
case v := <-watcher.Updates():
if v != nil {
t.Fatalf("Expected nil marker, got %+v", v)
}
case <-time.After(time.Second):
t.Fatalf("Did not receive nil marker like expected")
}

// Fire a timer and cancel the context after 250ms.
time.AfterFunc(250*time.Millisecond, cancel)

// Make sure canceling will break us out here.
select {
case <-watcher.Updates():
case <-time.After(5 * time.Second):
t.Fatalf("Did not break out like expected")
}
}

func TestKeyValueBindStore(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down Expand Up @@ -466,6 +505,48 @@ func TestKeyValueKeys(t *testing.T) {
}
}

func TestKeyValueDiscardNew(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 1, MaxBytes: 256})
expectOk(t, err)

vc := func() (major, minor, patch int) {
semVerRe := regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`)
m := semVerRe.FindStringSubmatch(nc.ConnectedServerVersion())
expectOk(t, err)
major, err = strconv.Atoi(m[1])
expectOk(t, err)
minor, err = strconv.Atoi(m[2])
expectOk(t, err)
patch, err = strconv.Atoi(m[3])
expectOk(t, err)
return major, minor, patch
}

major, minor, patch := vc()
status, err := kv.Status()
expectOk(t, err)
kvs := status.(*nats.KeyValueBucketStatus)
si := kvs.StreamInfo()

// If we are 2.7.1 or below DiscardOld should be used.
// If 2.7.2 or above should be DiscardNew
if major <= 2 && minor <= 7 && patch <= 1 {
if si.Config.Discard != nats.DiscardOld {
t.Fatalf("Expected Discard Old for server version %d.%d.%d", major, minor, patch)
}
} else {
if si.Config.Discard != nats.DiscardNew {
t.Fatalf("Expected Discard New for server version %d.%d.%d", major, minor, patch)
}
}
}

// Helpers

func client(t *testing.T, s *server.Server, opts ...nats.Option) *nats.Conn {
Expand Down

0 comments on commit 6cbe827

Please sign in to comment.