Skip to content

Commit

Permalink
Different approach to address initial pending == 0
Browse files Browse the repository at this point in the history
This is undoing changes done in PR #901
and makes the changes dicussed in the comments of that PR.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 9, 2022
1 parent 161162c commit cc6c8b1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 90 deletions.
92 changes: 39 additions & 53 deletions js.go
Expand Up @@ -974,6 +974,14 @@ type jsSub struct {
pull bool
dc bool // Delete JS consumer

// This is ConsumerInfo's Pending+Consumer.Delivered that we get from the
// add consumer response. Note that some versions of the server gather the
// consumer info *after* the creation of the consumer, which means that
// some messages may have been already delivered. So the sum of the two
// is a more accurate representation of the number of messages pending or
// in the process of being delivered to the subscription when created.
pending uint64

// Ordered consumers
ordered bool
dseq uint64
Expand Down Expand Up @@ -1281,37 +1289,33 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// Do some quick checks here for ordered consumers. We do these here instead of spread out
// in the individual SubOpts.
if o.ordered {
// Skip checks in special case where we use an internal option to bind to
// an existing consumer.
if !o.boc {
// Make sure we are not durable.
if isDurable {
return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
}
// Check ack policy.
if o.cfg.AckPolicy != ackPolicyNotSet {
return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
}
// Check max deliver.
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
}
// Check for bound consumers.
if consumer != _EMPTY_ {
return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
}
// Check for pull mode.
if isPullMode {
return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
}
// Make sure we are not durable.
if isDurable {
return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
}
// Check ack policy.
if o.cfg.AckPolicy != ackPolicyNotSet {
return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
}
// Check max deliver.
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
}
// Check for bound consumers.
if consumer != _EMPTY_ {
return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
}
// Check for pull mode.
if isPullMode {
return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
}
// Setup how we need it to be here.
o.cfg.FlowControl = true
Expand Down Expand Up @@ -1346,7 +1350,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// With an explicit durable name, we can lookup the consumer first
// to which it should be attaching to.
// If bind to ordered consumer is true, skip the lookup.
if consumer != _EMPTY_ && !o.boc {
if consumer != _EMPTY_ {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
Expand Down Expand Up @@ -1415,14 +1419,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
Config: &cfg,
}
hbi = cfg.Heartbeat

// If bind to ordered consumer is true, we wanted to do all above so that
// we have information about the ordered consumer and how to recreate it
// if later a gap is detected. However, we are not going to actually
// create the consumer since it was done by the caller.
if o.boc {
shouldCreate = false
}
}

if isPullMode {
Expand Down Expand Up @@ -1450,9 +1446,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
nms: nms,
psubj: subj,
cancel: cancel,
// Special internal case: the consumer was created outside, but we
// take ownership and will delete the consumer on unsub/drain.
dc: o.boc,
}

// Check if we are manual ack.
Expand Down Expand Up @@ -1537,6 +1530,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if err != nil {
return nil, err
}
jsi.pending = info.NumPending + info.Delivered.Consumer

if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
Expand Down Expand Up @@ -1573,6 +1567,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
sub.mu.Lock()
sub.jsi.dc = true
sub.jsi.pending = info.NumPending + info.Delivered.Consumer
// If this is an ephemeral, we did not have a consumer name, we get it from the info
// after the AddConsumer returns.
if consumer == _EMPTY_ {
Expand Down Expand Up @@ -2014,16 +2009,7 @@ type subOpts struct {
mack bool
// For an ordered consumer.
ordered bool
// Special case for KV to be able to bind an ordered consumer
boc bool
ctx context.Context
}

func bindOrderedConsumer() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.boc = true
return nil
})
ctx context.Context
}

// OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages.
Expand Down
55 changes: 18 additions & 37 deletions kv.go
Expand Up @@ -20,6 +20,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"
)

Expand Down Expand Up @@ -593,11 +594,11 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {

// Implementation for Watch
type watcher struct {
mu sync.Mutex
updates chan KeyValueEntry
sub *Subscription
initDone bool
initPending uint64
received uint64
}

// Updates returns the interior channel.
Expand Down Expand Up @@ -662,6 +663,8 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}
}
delta := uint64(parseNum(tokens[ackNumPendingTokenPos]))
w.mu.Lock()
defer w.mu.Unlock()
if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) {
entry := &kve{
bucket: kv.name,
Expand All @@ -676,68 +679,46 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}
// Check if done and initial values.
if !w.initDone {
w.received++
// We set this on the first trip through..
if w.initPending == 0 {
w.initPending = delta
}
if w.received > w.initPending || delta == 0 {
if w.initPending == 0 {
w.initDone = true
w.updates <- nil
} else {
w.initPending--
}
}
}

// The server sends the ConsumerInfo after the actual creation,
// and there is a possibility that the NumPending then does not
// reflect the real value. So we are creating the consumer
// without the NATS subscription started to make sure that
// there is no delivery.
cfg := ConsumerConfig{
DeliverSubject: kv.js.nc.newInbox(),
DeliverPolicy: DeliverAllPolicy,
FilterSubject: keys,
FlowControl: true,
AckPolicy: AckNonePolicy,
MaxDeliver: 1,
AckWait: 22 * time.Hour,
Direct: true,
Heartbeat: orderedHeartbeatsInterval,
}
// Used ordered consumer to deliver results.
// Use the deliver subject from the config and also set the internal option
// to indicate that we will bind to the above consumer, by-passing some of
// the check for public ordered consumers.
subOpts := []SubOpt{
OrderedConsumer(),
DeliverSubject(cfg.DeliverSubject),
bindOrderedConsumer()}
subOpts := []SubOpt{OrderedConsumer()}
if !o.includeHistory {
cfg.DeliverPolicy = DeliverLastPerSubjectPolicy
subOpts = append(subOpts, DeliverLastPerSubject())
}
if o.metaOnly {
cfg.HeadersOnly = true
subOpts = append(subOpts, HeadersOnly())
}
if o.ctx != nil {
subOpts = append(subOpts, Context(o.ctx))
}
ci, err := kv.js.AddConsumer(kv.stream, &cfg)
// Create the sub and rest of initialization under the lock.
// We want to prevent the race between this code and the
// update() callback.
w.mu.Lock()
defer w.mu.Unlock()
sub, err := kv.js.Subscribe(keys, update, subOpts...)
if err != nil {
return nil, err
}
if ci.NumPending == 0 {
sub.mu.Lock()
// If there were no pending messages at the time of the creation
// of the consumer, send the marker.
if sub.jsi != nil && sub.jsi.pending == 0 {
w.initDone = true
w.updates <- nil
}
// Now that we have the consumer name, use the Bind option.
subOpts = append(subOpts, Bind(kv.stream, ci.Name))
sub, err := kv.js.Subscribe(keys, update, subOpts...)
if err != nil {
return nil, err
}
sub.mu.Lock()
// Set us up to close when the waitForMessages func returns.
sub.pDone = func() {
close(w.updates)
Expand Down

0 comments on commit cc6c8b1

Please sign in to comment.