From cc6c8b1d4af7dd932c2cebae7a5bacaef4c555b1 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 9 Feb 2022 14:21:31 -0700 Subject: [PATCH] Different approach to address initial pending == 0 This is undoing changes done in PR https://github.com/nats-io/nats.go/pull/901 and makes the changes dicussed in the comments of that PR. Signed-off-by: Ivan Kozlovic --- js.go | 92 +++++++++++++++++++++++++---------------------------------- kv.go | 55 ++++++++++++----------------------- 2 files changed, 57 insertions(+), 90 deletions(-) diff --git a/js.go b/js.go index 59780712d..36d68e148 100644 --- a/js.go +++ b/js.go @@ -974,6 +974,14 @@ type jsSub struct { pull bool dc bool // Delete JS consumer + // This is ConsumerInfo's Pending+Consumer.Delivered that we get from the + // add consumer response. Note that some versions of the server gather the + // consumer info *after* the creation of the consumer, which means that + // some messages may have been already delivered. So the sum of the two + // is a more accurate representation of the number of messages pending or + // in the process of being delivered to the subscription when created. + pending uint64 + // Ordered consumers ordered bool dseq uint64 @@ -1281,37 +1289,33 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // Do some quick checks here for ordered consumers. We do these here instead of spread out // in the individual SubOpts. if o.ordered { - // Skip checks in special case where we use an internal option to bind to - // an existing consumer. - if !o.boc { - // Make sure we are not durable. - if isDurable { - return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer") - } - // Check ack policy. - if o.cfg.AckPolicy != ackPolicyNotSet { - return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer") - } - // Check max deliver. - if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 { - return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer") - } - // No deliver subject, we pick our own. - if o.cfg.DeliverSubject != _EMPTY_ { - return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer") - } - // Queue groups not allowed. - if queue != _EMPTY_ { - return nil, fmt.Errorf("nats: queues not be set for an ordered consumer") - } - // Check for bound consumers. - if consumer != _EMPTY_ { - return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer") - } - // Check for pull mode. - if isPullMode { - return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer") - } + // Make sure we are not durable. + if isDurable { + return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer") + } + // Check ack policy. + if o.cfg.AckPolicy != ackPolicyNotSet { + return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer") + } + // Check max deliver. + if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 { + return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer") + } + // No deliver subject, we pick our own. + if o.cfg.DeliverSubject != _EMPTY_ { + return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer") + } + // Queue groups not allowed. + if queue != _EMPTY_ { + return nil, fmt.Errorf("nats: queues not be set for an ordered consumer") + } + // Check for bound consumers. + if consumer != _EMPTY_ { + return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer") + } + // Check for pull mode. + if isPullMode { + return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer") } // Setup how we need it to be here. o.cfg.FlowControl = true @@ -1346,7 +1350,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // With an explicit durable name, we can lookup the consumer first // to which it should be attaching to. // If bind to ordered consumer is true, skip the lookup. - if consumer != _EMPTY_ && !o.boc { + if consumer != _EMPTY_ { info, err = js.ConsumerInfo(stream, consumer) notFoundErr = errors.Is(err, ErrConsumerNotFound) lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded @@ -1415,14 +1419,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, Config: &cfg, } hbi = cfg.Heartbeat - - // If bind to ordered consumer is true, we wanted to do all above so that - // we have information about the ordered consumer and how to recreate it - // if later a gap is detected. However, we are not going to actually - // create the consumer since it was done by the caller. - if o.boc { - shouldCreate = false - } } if isPullMode { @@ -1450,9 +1446,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, nms: nms, psubj: subj, cancel: cancel, - // Special internal case: the consumer was created outside, but we - // take ownership and will delete the consumer on unsub/drain. - dc: o.boc, } // Check if we are manual ack. @@ -1537,6 +1530,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if err != nil { return nil, err } + jsi.pending = info.NumPending + info.Delivered.Consumer if !isPullMode { // We can't reuse the channel, so if one was passed, we need to create a new one. @@ -1573,6 +1567,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain() sub.mu.Lock() sub.jsi.dc = true + sub.jsi.pending = info.NumPending + info.Delivered.Consumer // If this is an ephemeral, we did not have a consumer name, we get it from the info // after the AddConsumer returns. if consumer == _EMPTY_ { @@ -2014,16 +2009,7 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool - // Special case for KV to be able to bind an ordered consumer - boc bool - ctx context.Context -} - -func bindOrderedConsumer() SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.boc = true - return nil - }) + ctx context.Context } // OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages. diff --git a/kv.go b/kv.go index 5bd789935..66e347f38 100644 --- a/kv.go +++ b/kv.go @@ -20,6 +20,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" ) @@ -593,11 +594,11 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) { // Implementation for Watch type watcher struct { + mu sync.Mutex updates chan KeyValueEntry sub *Subscription initDone bool initPending uint64 - received uint64 } // Updates returns the interior channel. @@ -662,6 +663,8 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { } } delta := uint64(parseNum(tokens[ackNumPendingTokenPos])) + w.mu.Lock() + defer w.mu.Unlock() if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) { entry := &kve{ bucket: kv.name, @@ -676,68 +679,46 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { } // Check if done and initial values. if !w.initDone { - w.received++ // We set this on the first trip through.. if w.initPending == 0 { w.initPending = delta } - if w.received > w.initPending || delta == 0 { + if w.initPending == 0 { w.initDone = true w.updates <- nil + } else { + w.initPending-- } } } - // The server sends the ConsumerInfo after the actual creation, - // and there is a possibility that the NumPending then does not - // reflect the real value. So we are creating the consumer - // without the NATS subscription started to make sure that - // there is no delivery. - cfg := ConsumerConfig{ - DeliverSubject: kv.js.nc.newInbox(), - DeliverPolicy: DeliverAllPolicy, - FilterSubject: keys, - FlowControl: true, - AckPolicy: AckNonePolicy, - MaxDeliver: 1, - AckWait: 22 * time.Hour, - Direct: true, - Heartbeat: orderedHeartbeatsInterval, - } // Used ordered consumer to deliver results. - // Use the deliver subject from the config and also set the internal option - // to indicate that we will bind to the above consumer, by-passing some of - // the check for public ordered consumers. - subOpts := []SubOpt{ - OrderedConsumer(), - DeliverSubject(cfg.DeliverSubject), - bindOrderedConsumer()} + subOpts := []SubOpt{OrderedConsumer()} if !o.includeHistory { - cfg.DeliverPolicy = DeliverLastPerSubjectPolicy subOpts = append(subOpts, DeliverLastPerSubject()) } if o.metaOnly { - cfg.HeadersOnly = true subOpts = append(subOpts, HeadersOnly()) } if o.ctx != nil { subOpts = append(subOpts, Context(o.ctx)) } - ci, err := kv.js.AddConsumer(kv.stream, &cfg) + // Create the sub and rest of initialization under the lock. + // We want to prevent the race between this code and the + // update() callback. + w.mu.Lock() + defer w.mu.Unlock() + sub, err := kv.js.Subscribe(keys, update, subOpts...) if err != nil { return nil, err } - if ci.NumPending == 0 { + sub.mu.Lock() + // If there were no pending messages at the time of the creation + // of the consumer, send the marker. + if sub.jsi != nil && sub.jsi.pending == 0 { w.initDone = true w.updates <- nil } - // Now that we have the consumer name, use the Bind option. - subOpts = append(subOpts, Bind(kv.stream, ci.Name)) - sub, err := kv.js.Subscribe(keys, update, subOpts...) - if err != nil { - return nil, err - } - sub.mu.Lock() // Set us up to close when the waitForMessages func returns. sub.pDone = func() { close(w.updates)