Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to KV Watcher. #900

Merged
merged 3 commits into from Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions go_test.mod
@@ -1,11 +1,20 @@
module github.com/nats-io/nats.go

go 1.16
go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c
github.com/nats-io/nats-server/v2 v2.7.2
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)

require (
github.com/klauspost/compress v1.13.4 // indirect
github.com/minio/highwayhash v1.0.1 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
)
4 changes: 2 additions & 2 deletions go_test.sum
Expand Up @@ -17,8 +17,8 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c h1:TfLMCHvaj2YSNrgiEWQiXA344lWqPmX3xOLtZj/ywlA=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201004847-b23ed778684c/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.2 h1:+LEN8m0+jdCkiGc884WnDuxR+qj80/5arj+szKuRpRI=
github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
18 changes: 15 additions & 3 deletions js.go
@@ -1,4 +1,4 @@
// Copyright 2020-2021 The NATS Authors
// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -919,6 +919,9 @@ type ConsumerConfig struct {

// Ephemeral inactivity threshold.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

// Internal Use
Direct bool `json:"direct,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
Expand Down Expand Up @@ -968,6 +971,7 @@ type jsSub struct {
consumer string
stream string
deliver string
pending uint64
pull bool
dc bool // Delete JS consumer

Expand All @@ -977,6 +981,9 @@ type jsSub struct {
sseq uint64
ccreq *createConsumerRequest

// Optional watcher
w *watcher

// Heartbeats and Flow Control handling from push consumers.
hbc *time.Timer
hbi time.Duration
Expand Down Expand Up @@ -1312,6 +1319,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
o.cfg.AckPolicy = AckNonePolicy
o.cfg.MaxDeliver = 1
o.cfg.AckWait = 22 * time.Hour // Just set to something known, not utilized.
o.cfg.Direct = true
if !hasHeartbeats {
o.cfg.Heartbeat = orderedHeartbeatsInterval
}
Expand Down Expand Up @@ -1518,6 +1526,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if err != nil {
return nil, err
}
jsi.pending = info.NumPending

if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if isSync {
Expand All @@ -1534,6 +1544,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat

// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
Expand All @@ -1552,6 +1563,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
sub.mu.Lock()
sub.jsi.dc = true
jsi.pending = info.NumPending
// If this is an ephemeral, we did not have a consumer name, we get it from the info
// after the AddConsumer returns.
if consumer == _EMPTY_ {
Expand Down Expand Up @@ -1860,7 +1872,7 @@ func (sub *Subscription) activityCheck() {
}

active := jsi.active
jsi.hbc.Reset(jsi.hbi)
jsi.hbc.Reset(jsi.hbi * hbcThresh)
jsi.active = false
nc := sub.conn
sub.mu.Unlock()
Expand Down Expand Up @@ -1888,7 +1900,7 @@ func (sub *Subscription) scheduleHeartbeatCheck() {
if jsi.hbc == nil {
jsi.hbc = time.AfterFunc(jsi.hbi*hbcThresh, sub.activityCheck)
} else {
jsi.hbc.Reset(jsi.hbi)
jsi.hbc.Reset(jsi.hbi * hbcThresh)
}
}

Expand Down
63 changes: 41 additions & 22 deletions kv.go
@@ -1,4 +1,4 @@
// Copyright 2021 The NATS Authors
// Copyright 2021-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -306,6 +306,11 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
DenyDelete: true,
}

// If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below.
if js.nc.serverMinVersion(2, 7, 2) {
scfg.Discard = DiscardNew
}

if _, err := js.AddStream(scfg); err != nil {
return nil, err
}
Expand Down Expand Up @@ -626,14 +631,16 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}

var initDoneMarker bool
initPending, received := uint64(0), uint64(0)

// Could be a pattern so don't check for validity as we normally do.
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(keys)
keys = b.String()

w := &watcher{updates: make(chan KeyValueEntry, 32)}
// We will block below on placing items on the chan. That is by design.
w := &watcher{updates: make(chan KeyValueEntry, 256)}

update := func(m *Msg) {
tokens, err := getMetadataFields(m.Reply)
Expand All @@ -655,32 +662,32 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}
}
delta := uint64(parseNum(tokens[ackNumPendingTokenPos]))
entry := &kve{
bucket: kv.name,
key: subj,
value: m.Data,
revision: uint64(parseNum(tokens[ackStreamSeqTokenPos])),
created: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])),
delta: delta,
op: op,
}
if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) {
entry := &kve{
bucket: kv.name,
key: subj,
value: m.Data,
revision: uint64(parseNum(tokens[ackStreamSeqTokenPos])),
created: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])),
delta: delta,
op: op,
}
w.updates <- entry
}
// Check if done initial values.
if !initDoneMarker && delta == 0 {
initDoneMarker = true
w.updates <- nil
// Check if done and initial values.
if !initDoneMarker {
received++
// We set this on the first trip through..
if initPending == 0 {
initPending = delta
}
if received > initPending || delta == 0 {
initDoneMarker = true
w.updates <- nil
}
}
}

// Check if we have anything pending.
_, err := kv.js.GetLastMsg(kv.stream, keys)
if err == ErrMsgNotFound {
initDoneMarker = true
w.updates <- nil
}

// Used ordered consumer to deliver results.
subOpts := []SubOpt{OrderedConsumer()}
if !o.includeHistory {
Expand All @@ -696,6 +703,18 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
if err != nil {
return nil, err
}
// Track watcher.
sub.jsi.w = w
// Set us up to close when the waitForMessages func returns.
sub.pDone = func() {
close(w.updates)
}
// Check on pending count.
if sub.jsi.pending == 0 {
initDoneMarker = true
w.updates <- nil
}

w.sub = sub
return w, nil
}
Expand Down
11 changes: 10 additions & 1 deletion nats.go
Expand Up @@ -158,6 +158,7 @@ var (
ErrConsumerNotActive = errors.New("nats: consumer not active")
ErrMsgNotFound = errors.New("nats: message not found")
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
)

func init() {
Expand Down Expand Up @@ -586,6 +587,7 @@ type Subscription struct {
pHead *Msg
pTail *Msg
pCond *sync.Cond
pDone func()

// Pending stats, async subscriptions, high-speed etc.
pMsgs int
Expand Down Expand Up @@ -2696,7 +2698,13 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
}
s.pHead = m.next
}
// Now check for pDone
done := s.pDone
s.mu.Unlock()

if done != nil {
done()
}
}

// Used for debugging and simulating loss for certain tests.
Expand Down Expand Up @@ -3928,7 +3936,8 @@ func (nc *Conn) removeSub(s *Subscription) {
s.mch = nil

// If JS subscription then stop HB timer.
if jsi := s.jsi; jsi != nil {
jsi := s.jsi
if jsi != nil {
if jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
Expand Down
83 changes: 82 additions & 1 deletion test/kv_test.go
@@ -1,4 +1,4 @@
// Copyright 2022 The NATS Authors
// Copyright 2021-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -258,6 +259,44 @@ func TestKeyValueWatchContext(t *testing.T) {
}
}

func TestKeyValueWatchContextUpdates(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCHCTX"})
expectOk(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

watcher, err := kv.WatchAll(nats.Context(ctx))
expectOk(t, err)
defer watcher.Stop()

// Pull the initial state done marker which is nil.
select {
case v := <-watcher.Updates():
if v != nil {
t.Fatalf("Expected nil marker, got %+v", v)
}
case <-time.After(time.Second):
t.Fatalf("Did not receive nil marker like expected")
}

// Fire a timer and cancel the context after 250ms.
time.AfterFunc(250*time.Millisecond, cancel)

// Make sure canceling will break us out here.
select {
case <-watcher.Updates():
case <-time.After(5 * time.Second):
t.Fatalf("Did not break out like expected")
}
}

func TestKeyValueBindStore(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down Expand Up @@ -466,6 +505,48 @@ func TestKeyValueKeys(t *testing.T) {
}
}

func TestKeyValueDiscardNew(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 1, MaxBytes: 256})
expectOk(t, err)

vc := func() (major, minor, patch int) {
semVerRe := regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`)
m := semVerRe.FindStringSubmatch(nc.ConnectedServerVersion())
expectOk(t, err)
major, err = strconv.Atoi(m[1])
expectOk(t, err)
minor, err = strconv.Atoi(m[2])
expectOk(t, err)
patch, err = strconv.Atoi(m[3])
expectOk(t, err)
return major, minor, patch
}

major, minor, patch := vc()
status, err := kv.Status()
expectOk(t, err)
kvs := status.(*nats.KeyValueBucketStatus)
si := kvs.StreamInfo()

// If we are 2.7.1 or below DiscardOld should be used.
// If 2.7.2 or above should be DiscardNew
if major <= 2 && minor <= 7 && patch <= 1 {
if si.Config.Discard != nats.DiscardOld {
t.Fatalf("Expected Discard Old for server version %d.%d.%d", major, minor, patch)
}
} else {
if si.Config.Discard != nats.DiscardNew {
t.Fatalf("Expected Discard New for server version %d.%d.%d", major, minor, patch)
}
}
}

// Helpers

func client(t *testing.T, s *server.Server, opts ...nats.Option) *nats.Conn {
Expand Down