Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Different approach to address initial pending == 0 #902

Merged
merged 3 commits into from Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 38 additions & 53 deletions js.go
Expand Up @@ -974,6 +974,14 @@ type jsSub struct {
pull bool
dc bool // Delete JS consumer

// This is ConsumerInfo's Pending+Consumer.Delivered that we get from the
// add consumer response. Note that some versions of the server gather the
// consumer info *after* the creation of the consumer, which means that
// some messages may have been already delivered. So the sum of the two
// is a more accurate representation of the number of messages pending or
// in the process of being delivered to the subscription when created.
pending uint64

// Ordered consumers
ordered bool
dseq uint64
Expand Down Expand Up @@ -1281,37 +1289,33 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// Do some quick checks here for ordered consumers. We do these here instead of spread out
// in the individual SubOpts.
if o.ordered {
// Skip checks in special case where we use an internal option to bind to
// an existing consumer.
if !o.boc {
// Make sure we are not durable.
if isDurable {
return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
}
// Check ack policy.
if o.cfg.AckPolicy != ackPolicyNotSet {
return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
}
// Check max deliver.
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
}
// Check for bound consumers.
if consumer != _EMPTY_ {
return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
}
// Check for pull mode.
if isPullMode {
return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
}
// Make sure we are not durable.
if isDurable {
return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
}
// Check ack policy.
if o.cfg.AckPolicy != ackPolicyNotSet {
return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
}
// Check max deliver.
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
}
// Check for bound consumers.
if consumer != _EMPTY_ {
return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
}
// Check for pull mode.
if isPullMode {
return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
}
// Setup how we need it to be here.
o.cfg.FlowControl = true
Expand Down Expand Up @@ -1346,7 +1350,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// With an explicit durable name, we can lookup the consumer first
// to which it should be attaching to.
// If bind to ordered consumer is true, skip the lookup.
if consumer != _EMPTY_ && !o.boc {
if consumer != _EMPTY_ {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
Expand Down Expand Up @@ -1415,14 +1419,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
Config: &cfg,
}
hbi = cfg.Heartbeat

// If bind to ordered consumer is true, we wanted to do all above so that
// we have information about the ordered consumer and how to recreate it
// if later a gap is detected. However, we are not going to actually
// create the consumer since it was done by the caller.
if o.boc {
shouldCreate = false
}
}

if isPullMode {
Expand Down Expand Up @@ -1450,9 +1446,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
nms: nms,
psubj: subj,
cancel: cancel,
// Special internal case: the consumer was created outside, but we
// take ownership and will delete the consumer on unsub/drain.
dc: o.boc,
}

// Check if we are manual ack.
Expand Down Expand Up @@ -1573,6 +1566,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
sub.mu.Lock()
sub.jsi.dc = true
sub.jsi.pending = info.NumPending + info.Delivered.Consumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good, and this PR is much nicer thanks.

// If this is an ephemeral, we did not have a consumer name, we get it from the info
// after the AddConsumer returns.
if consumer == _EMPTY_ {
Expand Down Expand Up @@ -2014,16 +2008,7 @@ type subOpts struct {
mack bool
// For an ordered consumer.
ordered bool
// Special case for KV to be able to bind an ordered consumer
boc bool
ctx context.Context
}

func bindOrderedConsumer() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.boc = true
return nil
})
ctx context.Context
}

// OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages.
Expand Down
49 changes: 15 additions & 34 deletions kv.go
Expand Up @@ -20,6 +20,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"
)

Expand Down Expand Up @@ -593,6 +594,7 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {

// Implementation for Watch
type watcher struct {
mu sync.Mutex
updates chan KeyValueEntry
sub *Subscription
initDone bool
Expand Down Expand Up @@ -662,6 +664,8 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}
}
delta := uint64(parseNum(tokens[ackNumPendingTokenPos]))
w.mu.Lock()
defer w.mu.Unlock()
if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) {
entry := &kve{
bucket: kv.name,
Expand All @@ -688,56 +692,33 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}
}

// The server sends the ConsumerInfo after the actual creation,
// and there is a possibility that the NumPending then does not
// reflect the real value. So we are creating the consumer
// without the NATS subscription started to make sure that
// there is no delivery.
cfg := ConsumerConfig{
DeliverSubject: kv.js.nc.newInbox(),
DeliverPolicy: DeliverAllPolicy,
FilterSubject: keys,
FlowControl: true,
AckPolicy: AckNonePolicy,
MaxDeliver: 1,
AckWait: 22 * time.Hour,
Direct: true,
Heartbeat: orderedHeartbeatsInterval,
}
// Used ordered consumer to deliver results.
// Use the deliver subject from the config and also set the internal option
// to indicate that we will bind to the above consumer, by-passing some of
// the check for public ordered consumers.
subOpts := []SubOpt{
OrderedConsumer(),
DeliverSubject(cfg.DeliverSubject),
bindOrderedConsumer()}
subOpts := []SubOpt{OrderedConsumer()}
if !o.includeHistory {
cfg.DeliverPolicy = DeliverLastPerSubjectPolicy
subOpts = append(subOpts, DeliverLastPerSubject())
}
if o.metaOnly {
cfg.HeadersOnly = true
subOpts = append(subOpts, HeadersOnly())
}
if o.ctx != nil {
subOpts = append(subOpts, Context(o.ctx))
}
ci, err := kv.js.AddConsumer(kv.stream, &cfg)
// Create the sub and rest of initialization under the lock.
// We want to prevent the race between this code and the
// update() callback.
w.mu.Lock()
defer w.mu.Unlock()
sub, err := kv.js.Subscribe(keys, update, subOpts...)
if err != nil {
return nil, err
}
if ci.NumPending == 0 {
sub.mu.Lock()
// If there were no pending messages at the time of the creation
// of the consumer, send the marker.
if sub.jsi != nil && sub.jsi.pending == 0 {
w.initDone = true
w.updates <- nil
}
// Now that we have the consumer name, use the Bind option.
subOpts = append(subOpts, Bind(kv.stream, ci.Name))
sub, err := kv.js.Subscribe(keys, update, subOpts...)
if err != nil {
return nil, err
}
sub.mu.Lock()
// Set us up to close when the waitForMessages func returns.
sub.pDone = func() {
close(w.updates)
Expand Down