Skip to content

Commit

Permalink
Merge pull request #901 from nats-io/kv_updates
Browse files Browse the repository at this point in the history
Updates to KV
  • Loading branch information
kozlovic committed Feb 9, 2022
2 parents 6cbe827 + 85e8c7f commit 161162c
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 56 deletions.
89 changes: 54 additions & 35 deletions js.go
Expand Up @@ -971,7 +971,6 @@ type jsSub struct {
consumer string
stream string
deliver string
pending uint64
pull bool
dc bool // Delete JS consumer

Expand All @@ -981,9 +980,6 @@ type jsSub struct {
sseq uint64
ccreq *createConsumerRequest

// Optional watcher
w *watcher

// Heartbeats and Flow Control handling from push consumers.
hbc *time.Timer
hbi time.Duration
Expand Down Expand Up @@ -1285,33 +1281,37 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// Do some quick checks here for ordered consumers. We do these here instead of spread out
// in the individual SubOpts.
if o.ordered {
// Make sure we are not durable.
if isDurable {
return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
}
// Check ack policy.
if o.cfg.AckPolicy != ackPolicyNotSet {
return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
}
// Check max deliver.
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
}
// Check for bound consumers.
if consumer != _EMPTY_ {
return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
}
// Check for pull mode.
if isPullMode {
return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
// Skip checks in special case where we use an internal option to bind to
// an existing consumer.
if !o.boc {
// Make sure we are not durable.
if isDurable {
return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
}
// Check ack policy.
if o.cfg.AckPolicy != ackPolicyNotSet {
return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
}
// Check max deliver.
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
}
// Check for bound consumers.
if consumer != _EMPTY_ {
return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
}
// Check for pull mode.
if isPullMode {
return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
}
}
// Setup how we need it to be here.
o.cfg.FlowControl = true
Expand Down Expand Up @@ -1345,7 +1345,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

// With an explicit durable name, we can lookup the consumer first
// to which it should be attaching to.
if consumer != _EMPTY_ {
// If bind to ordered consumer is true, skip the lookup.
if consumer != _EMPTY_ && !o.boc {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
Expand Down Expand Up @@ -1414,6 +1415,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
Config: &cfg,
}
hbi = cfg.Heartbeat

// If bind to ordered consumer is true, we wanted to do all above so that
// we have information about the ordered consumer and how to recreate it
// if later a gap is detected. However, we are not going to actually
// create the consumer since it was done by the caller.
if o.boc {
shouldCreate = false
}
}

if isPullMode {
Expand Down Expand Up @@ -1441,6 +1450,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
nms: nms,
psubj: subj,
cancel: cancel,
// Special internal case: the consumer was created outside, but we
// take ownership and will delete the consumer on unsub/drain.
dc: o.boc,
}

// Check if we are manual ack.
Expand Down Expand Up @@ -1525,7 +1537,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if err != nil {
return nil, err
}
jsi.pending = info.NumPending

if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
Expand Down Expand Up @@ -1562,7 +1573,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
sub.mu.Lock()
sub.jsi.dc = true
jsi.pending = info.NumPending
// If this is an ephemeral, we did not have a consumer name, we get it from the info
// after the AddConsumer returns.
if consumer == _EMPTY_ {
Expand Down Expand Up @@ -2004,7 +2014,16 @@ type subOpts struct {
mack bool
// For an ordered consumer.
ordered bool
ctx context.Context
// Special case for KV to be able to bind an ordered consumer
boc bool
ctx context.Context
}

func bindOrderedConsumer() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.boc = true
return nil
})
}

// OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages.
Expand Down
67 changes: 48 additions & 19 deletions kv.go
Expand Up @@ -593,8 +593,11 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {

// Implementation for Watch
type watcher struct {
updates chan KeyValueEntry
sub *Subscription
updates chan KeyValueEntry
sub *Subscription
initDone bool
initPending uint64
received uint64
}

// Updates returns the interior channel.
Expand Down Expand Up @@ -630,9 +633,6 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}
}

var initDoneMarker bool
initPending, received := uint64(0), uint64(0)

// Could be a pattern so don't check for validity as we normally do.
var b strings.Builder
b.WriteString(kv.pre)
Expand Down Expand Up @@ -675,45 +675,74 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
w.updates <- entry
}
// Check if done and initial values.
if !initDoneMarker {
received++
if !w.initDone {
w.received++
// We set this on the first trip through..
if initPending == 0 {
initPending = delta
if w.initPending == 0 {
w.initPending = delta
}
if received > initPending || delta == 0 {
initDoneMarker = true
if w.received > w.initPending || delta == 0 {
w.initDone = true
w.updates <- nil
}
}
}

// The server sends the ConsumerInfo after the actual creation,
// and there is a possibility that the NumPending then does not
// reflect the real value. So we are creating the consumer
// without the NATS subscription started to make sure that
// there is no delivery.
cfg := ConsumerConfig{
DeliverSubject: kv.js.nc.newInbox(),
DeliverPolicy: DeliverAllPolicy,
FilterSubject: keys,
FlowControl: true,
AckPolicy: AckNonePolicy,
MaxDeliver: 1,
AckWait: 22 * time.Hour,
Direct: true,
Heartbeat: orderedHeartbeatsInterval,
}
// Used ordered consumer to deliver results.
subOpts := []SubOpt{OrderedConsumer()}
// Use the deliver subject from the config and also set the internal option
// to indicate that we will bind to the above consumer, by-passing some of
// the check for public ordered consumers.
subOpts := []SubOpt{
OrderedConsumer(),
DeliverSubject(cfg.DeliverSubject),
bindOrderedConsumer()}
if !o.includeHistory {
cfg.DeliverPolicy = DeliverLastPerSubjectPolicy
subOpts = append(subOpts, DeliverLastPerSubject())
}
if o.metaOnly {
cfg.HeadersOnly = true
subOpts = append(subOpts, HeadersOnly())
}
if o.ctx != nil {
subOpts = append(subOpts, Context(o.ctx))
}
ci, err := kv.js.AddConsumer(kv.stream, &cfg)
if err != nil {
return nil, err
}
if ci.NumPending == 0 {
w.initDone = true
w.updates <- nil
}
// Now that we have the consumer name, use the Bind option.
subOpts = append(subOpts, Bind(kv.stream, ci.Name))
sub, err := kv.js.Subscribe(keys, update, subOpts...)
if err != nil {
return nil, err
}
// Track watcher.
sub.jsi.w = w
sub.mu.Lock()
// Set us up to close when the waitForMessages func returns.
sub.pDone = func() {
close(w.updates)
}
// Check on pending count.
if sub.jsi.pending == 0 {
initDoneMarker = true
w.updates <- nil
}
sub.mu.Unlock()

w.sub = sub
return w, nil
Expand Down
3 changes: 1 addition & 2 deletions nats.go
Expand Up @@ -3935,8 +3935,7 @@ func (nc *Conn) removeSub(s *Subscription) {
s.mch = nil

// If JS subscription then stop HB timer.
jsi := s.jsi
if jsi != nil {
if jsi := s.jsi; jsi != nil {
if jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
Expand Down

0 comments on commit 161162c

Please sign in to comment.