diff --git a/js.go b/js.go index 35036894b..59780712d 100644 --- a/js.go +++ b/js.go @@ -971,7 +971,6 @@ type jsSub struct { consumer string stream string deliver string - pending uint64 pull bool dc bool // Delete JS consumer @@ -981,9 +980,6 @@ type jsSub struct { sseq uint64 ccreq *createConsumerRequest - // Optional watcher - w *watcher - // Heartbeats and Flow Control handling from push consumers. hbc *time.Timer hbi time.Duration @@ -1285,33 +1281,37 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // Do some quick checks here for ordered consumers. We do these here instead of spread out // in the individual SubOpts. if o.ordered { - // Make sure we are not durable. - if isDurable { - return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer") - } - // Check ack policy. - if o.cfg.AckPolicy != ackPolicyNotSet { - return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer") - } - // Check max deliver. - if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 { - return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer") - } - // No deliver subject, we pick our own. - if o.cfg.DeliverSubject != _EMPTY_ { - return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer") - } - // Queue groups not allowed. - if queue != _EMPTY_ { - return nil, fmt.Errorf("nats: queues not be set for an ordered consumer") - } - // Check for bound consumers. - if consumer != _EMPTY_ { - return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer") - } - // Check for pull mode. - if isPullMode { - return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer") + // Skip checks in special case where we use an internal option to bind to + // an existing consumer. + if !o.boc { + // Make sure we are not durable. + if isDurable { + return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer") + } + // Check ack policy. + if o.cfg.AckPolicy != ackPolicyNotSet { + return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer") + } + // Check max deliver. + if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 { + return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer") + } + // No deliver subject, we pick our own. + if o.cfg.DeliverSubject != _EMPTY_ { + return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer") + } + // Queue groups not allowed. + if queue != _EMPTY_ { + return nil, fmt.Errorf("nats: queues not be set for an ordered consumer") + } + // Check for bound consumers. + if consumer != _EMPTY_ { + return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer") + } + // Check for pull mode. + if isPullMode { + return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer") + } } // Setup how we need it to be here. o.cfg.FlowControl = true @@ -1345,7 +1345,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // With an explicit durable name, we can lookup the consumer first // to which it should be attaching to. - if consumer != _EMPTY_ { + // If bind to ordered consumer is true, skip the lookup. + if consumer != _EMPTY_ && !o.boc { info, err = js.ConsumerInfo(stream, consumer) notFoundErr = errors.Is(err, ErrConsumerNotFound) lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded @@ -1414,6 +1415,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, Config: &cfg, } hbi = cfg.Heartbeat + + // If bind to ordered consumer is true, we wanted to do all above so that + // we have information about the ordered consumer and how to recreate it + // if later a gap is detected. However, we are not going to actually + // create the consumer since it was done by the caller. + if o.boc { + shouldCreate = false + } } if isPullMode { @@ -1441,6 +1450,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, nms: nms, psubj: subj, cancel: cancel, + // Special internal case: the consumer was created outside, but we + // take ownership and will delete the consumer on unsub/drain. + dc: o.boc, } // Check if we are manual ack. @@ -1525,7 +1537,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if err != nil { return nil, err } - jsi.pending = info.NumPending if !isPullMode { // We can't reuse the channel, so if one was passed, we need to create a new one. @@ -1562,7 +1573,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain() sub.mu.Lock() sub.jsi.dc = true - jsi.pending = info.NumPending // If this is an ephemeral, we did not have a consumer name, we get it from the info // after the AddConsumer returns. if consumer == _EMPTY_ { @@ -2004,7 +2014,16 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool - ctx context.Context + // Special case for KV to be able to bind an ordered consumer + boc bool + ctx context.Context +} + +func bindOrderedConsumer() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.boc = true + return nil + }) } // OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages. diff --git a/kv.go b/kv.go index 3afef76cb..5bd789935 100644 --- a/kv.go +++ b/kv.go @@ -593,8 +593,11 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) { // Implementation for Watch type watcher struct { - updates chan KeyValueEntry - sub *Subscription + updates chan KeyValueEntry + sub *Subscription + initDone bool + initPending uint64 + received uint64 } // Updates returns the interior channel. @@ -630,9 +633,6 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { } } - var initDoneMarker bool - initPending, received := uint64(0), uint64(0) - // Could be a pattern so don't check for validity as we normally do. var b strings.Builder b.WriteString(kv.pre) @@ -675,45 +675,74 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { w.updates <- entry } // Check if done and initial values. - if !initDoneMarker { - received++ + if !w.initDone { + w.received++ // We set this on the first trip through.. - if initPending == 0 { - initPending = delta + if w.initPending == 0 { + w.initPending = delta } - if received > initPending || delta == 0 { - initDoneMarker = true + if w.received > w.initPending || delta == 0 { + w.initDone = true w.updates <- nil } } } + // The server sends the ConsumerInfo after the actual creation, + // and there is a possibility that the NumPending then does not + // reflect the real value. So we are creating the consumer + // without the NATS subscription started to make sure that + // there is no delivery. + cfg := ConsumerConfig{ + DeliverSubject: kv.js.nc.newInbox(), + DeliverPolicy: DeliverAllPolicy, + FilterSubject: keys, + FlowControl: true, + AckPolicy: AckNonePolicy, + MaxDeliver: 1, + AckWait: 22 * time.Hour, + Direct: true, + Heartbeat: orderedHeartbeatsInterval, + } // Used ordered consumer to deliver results. - subOpts := []SubOpt{OrderedConsumer()} + // Use the deliver subject from the config and also set the internal option + // to indicate that we will bind to the above consumer, by-passing some of + // the check for public ordered consumers. + subOpts := []SubOpt{ + OrderedConsumer(), + DeliverSubject(cfg.DeliverSubject), + bindOrderedConsumer()} if !o.includeHistory { + cfg.DeliverPolicy = DeliverLastPerSubjectPolicy subOpts = append(subOpts, DeliverLastPerSubject()) } if o.metaOnly { + cfg.HeadersOnly = true subOpts = append(subOpts, HeadersOnly()) } if o.ctx != nil { subOpts = append(subOpts, Context(o.ctx)) } + ci, err := kv.js.AddConsumer(kv.stream, &cfg) + if err != nil { + return nil, err + } + if ci.NumPending == 0 { + w.initDone = true + w.updates <- nil + } + // Now that we have the consumer name, use the Bind option. + subOpts = append(subOpts, Bind(kv.stream, ci.Name)) sub, err := kv.js.Subscribe(keys, update, subOpts...) if err != nil { return nil, err } - // Track watcher. - sub.jsi.w = w + sub.mu.Lock() // Set us up to close when the waitForMessages func returns. sub.pDone = func() { close(w.updates) } - // Check on pending count. - if sub.jsi.pending == 0 { - initDoneMarker = true - w.updates <- nil - } + sub.mu.Unlock() w.sub = sub return w, nil diff --git a/nats.go b/nats.go index 7020095fb..3bf5b4d57 100644 --- a/nats.go +++ b/nats.go @@ -3935,8 +3935,7 @@ func (nc *Conn) removeSub(s *Subscription) { s.mch = nil // If JS subscription then stop HB timer. - jsi := s.jsi - if jsi != nil { + if jsi := s.jsi; jsi != nil { if jsi.hbc != nil { jsi.hbc.Stop() jsi.hbc = nil