Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to KV #901

Merged
merged 1 commit into from Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
89 changes: 54 additions & 35 deletions js.go
Expand Up @@ -971,7 +971,6 @@ type jsSub struct {
consumer string
stream string
deliver string
pending uint64
pull bool
dc bool // Delete JS consumer

Expand All @@ -981,9 +980,6 @@ type jsSub struct {
sseq uint64
ccreq *createConsumerRequest

// Optional watcher
w *watcher

// Heartbeats and Flow Control handling from push consumers.
hbc *time.Timer
hbi time.Duration
Expand Down Expand Up @@ -1285,33 +1281,37 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// Do some quick checks here for ordered consumers. We do these here instead of spread out
// in the individual SubOpts.
if o.ordered {
// Make sure we are not durable.
if isDurable {
return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
}
// Check ack policy.
if o.cfg.AckPolicy != ackPolicyNotSet {
return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
}
// Check max deliver.
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
}
// Check for bound consumers.
if consumer != _EMPTY_ {
return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
}
// Check for pull mode.
if isPullMode {
return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
// Skip checks in special case where we use an internal option to bind to
// an existing consumer.
if !o.boc {
// Make sure we are not durable.
if isDurable {
return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
}
// Check ack policy.
if o.cfg.AckPolicy != ackPolicyNotSet {
return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
}
// Check max deliver.
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
}
// Check for bound consumers.
if consumer != _EMPTY_ {
return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
}
// Check for pull mode.
if isPullMode {
return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
}
}
// Setup how we need it to be here.
o.cfg.FlowControl = true
Expand Down Expand Up @@ -1345,7 +1345,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

// With an explicit durable name, we can lookup the consumer first
// to which it should be attaching to.
if consumer != _EMPTY_ {
// If bind to ordered consumer is true, skip the lookup.
if consumer != _EMPTY_ && !o.boc {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
Expand Down Expand Up @@ -1414,6 +1415,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
Config: &cfg,
}
hbi = cfg.Heartbeat

// If bind to ordered consumer is true, we wanted to do all above so that
// we have information about the ordered consumer and how to recreate it
// if later a gap is detected. However, we are not going to actually
// create the consumer since it was done by the caller.
if o.boc {
shouldCreate = false
}
}

if isPullMode {
Expand Down Expand Up @@ -1441,6 +1450,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
nms: nms,
psubj: subj,
cancel: cancel,
// Special internal case: the consumer was created outside, but we
// take ownership and will delete the consumer on unsub/drain.
dc: o.boc,
}

// Check if we are manual ack.
Expand Down Expand Up @@ -1525,7 +1537,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if err != nil {
return nil, err
}
jsi.pending = info.NumPending

if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
Expand Down Expand Up @@ -1562,7 +1573,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
sub.mu.Lock()
sub.jsi.dc = true
jsi.pending = info.NumPending
// If this is an ephemeral, we did not have a consumer name, we get it from the info
// after the AddConsumer returns.
if consumer == _EMPTY_ {
Expand Down Expand Up @@ -2004,7 +2014,16 @@ type subOpts struct {
mack bool
// For an ordered consumer.
ordered bool
ctx context.Context
// Special case for KV to be able to bind an ordered consumer
boc bool
ctx context.Context
}

func bindOrderedConsumer() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.boc = true
return nil
})
}

// OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages.
Expand Down
67 changes: 48 additions & 19 deletions kv.go
Expand Up @@ -593,8 +593,11 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {

// Implementation for Watch
type watcher struct {
updates chan KeyValueEntry
sub *Subscription
updates chan KeyValueEntry
sub *Subscription
initDone bool
initPending uint64
received uint64
}

// Updates returns the interior channel.
Expand Down Expand Up @@ -630,9 +633,6 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}
}

var initDoneMarker bool
initPending, received := uint64(0), uint64(0)

// Could be a pattern so don't check for validity as we normally do.
var b strings.Builder
b.WriteString(kv.pre)
Expand Down Expand Up @@ -675,45 +675,74 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
w.updates <- entry
}
// Check if done and initial values.
if !initDoneMarker {
received++
if !w.initDone {
w.received++
// We set this on the first trip through..
if initPending == 0 {
initPending = delta
if w.initPending == 0 {
w.initPending = delta
}
if received > initPending || delta == 0 {
initDoneMarker = true
if w.received > w.initPending || delta == 0 {
w.initDone = true
w.updates <- nil
}
}
}

// The server sends the ConsumerInfo after the actual creation,
// and there is a possibility that the NumPending then does not
// reflect the real value. So we are creating the consumer
// without the NATS subscription started to make sure that
// there is no delivery.
cfg := ConsumerConfig{
DeliverSubject: kv.js.nc.newInbox(),
DeliverPolicy: DeliverAllPolicy,
FilterSubject: keys,
FlowControl: true,
AckPolicy: AckNonePolicy,
MaxDeliver: 1,
AckWait: 22 * time.Hour,
Direct: true,
Heartbeat: orderedHeartbeatsInterval,
}
// Used ordered consumer to deliver results.
subOpts := []SubOpt{OrderedConsumer()}
// Use the deliver subject from the config and also set the internal option
// to indicate that we will bind to the above consumer, by-passing some of
// the check for public ordered consumers.
subOpts := []SubOpt{
OrderedConsumer(),
DeliverSubject(cfg.DeliverSubject),
bindOrderedConsumer()}
if !o.includeHistory {
cfg.DeliverPolicy = DeliverLastPerSubjectPolicy
subOpts = append(subOpts, DeliverLastPerSubject())
}
if o.metaOnly {
cfg.HeadersOnly = true
subOpts = append(subOpts, HeadersOnly())
}
if o.ctx != nil {
subOpts = append(subOpts, Context(o.ctx))
}
ci, err := kv.js.AddConsumer(kv.stream, &cfg)
if err != nil {
return nil, err
}
if ci.NumPending == 0 {
w.initDone = true
w.updates <- nil
}
// Now that we have the consumer name, use the Bind option.
subOpts = append(subOpts, Bind(kv.stream, ci.Name))
sub, err := kv.js.Subscribe(keys, update, subOpts...)
if err != nil {
return nil, err
}
// Track watcher.
sub.jsi.w = w
sub.mu.Lock()
// Set us up to close when the waitForMessages func returns.
sub.pDone = func() {
close(w.updates)
}
// Check on pending count.
if sub.jsi.pending == 0 {
initDoneMarker = true
w.updates <- nil
}
sub.mu.Unlock()

w.sub = sub
return w, nil
Expand Down
3 changes: 1 addition & 2 deletions nats.go
Expand Up @@ -3935,8 +3935,7 @@ func (nc *Conn) removeSub(s *Subscription) {
s.mch = nil

// If JS subscription then stop HB timer.
jsi := s.jsi
if jsi != nil {
if jsi := s.jsi; jsi != nil {
if jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
Expand Down