Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

js: Add nats.Bind option to disable creating consumers #740

Merged
merged 1 commit into from Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
107 changes: 81 additions & 26 deletions js.go
Expand Up @@ -1008,7 +1008,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}
}

isPullMode := ch == nil && cb == nil
isPullMode := ch == nil && cb == nil && !isSync
badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
hasHeartbeats := o.cfg.Heartbeat > 0
hasFC := o.cfg.FlowControl
Expand All @@ -1017,17 +1017,26 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}

var (
err error
shouldCreate bool
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
err error
shouldCreate bool
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
consumerBound = o.bound
notFoundErr bool
lookupErr bool
)

// In case a consumer has not been set explicitly, then the
// durable name will be used as the consumer name.
if consumer == _EMPTY_ {
consumer = o.cfg.Durable
}

// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
stream, err = js.lookupStreamBySubject(subj)
Expand All @@ -1038,18 +1047,16 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
stream = o.stream
}

// With an explicit durable name, then can lookup
// the consumer to which it should be attaching to.
consumer = o.cfg.Durable
// With an explicit durable name, then can lookup the consumer first
// to which it should be attaching to.
if consumer != _EMPTY_ {
// Only create in case there is no consumer already.
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != "nats: consumer not found" {
return nil, err
}
notFoundErr = err != nil && strings.Contains(err.Error(), "consumer not found")
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
}

if info != nil {
switch {
case info != nil:
// Attach using the found consumer config.
ccfg = &info.Config
attached = true
Expand All @@ -1059,12 +1066,25 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
return nil, ErrSubjectMismatch
}

// Prevent binding a subscription against incompatible consumer types.
if isPullMode && ccfg.DeliverSubject != _EMPTY_ {
return nil, ErrPullSubscribeToPushConsumer
} else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ {
return nil, ErrPullSubscribeRequired
}

if ccfg.DeliverSubject != _EMPTY_ {
deliver = ccfg.DeliverSubject
} else {
} else if !isPullMode {
deliver = NewInbox()
}
} else {
case (err != nil && !notFoundErr) || (notFoundErr && consumerBound):
// If the consumer is being bound got an error on pull subscribe then allow the error.
if !(isPullMode && lookupErr && consumerBound) {
return nil, err
}
default:
// Attempt to create consumer if not found nor using Bind.
shouldCreate = true
deliver = NewInbox()
if !isPullMode {
Expand Down Expand Up @@ -1351,6 +1371,8 @@ type subOpts struct {
mack bool
// For creating or updating.
cfg *ConsumerConfig
// For binding a subscription to a consumer without creating it.
bound bool
}

// ManualAck disables auto ack functionality for async subscriptions.
Expand All @@ -1362,16 +1384,19 @@ func ManualAck() SubOpt {
}

// Durable defines the consumer name for JetStream durable subscribers.
func Durable(name string) SubOpt {
func Durable(consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if opts.cfg.Durable != "" {
if opts.cfg.Durable != _EMPTY_ {
return fmt.Errorf("nats: option Durable set more than once")
}
if strings.Contains(name, ".") {
if opts.consumer != _EMPTY_ && opts.consumer != consumer {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer)
}
if strings.Contains(consumer, ".") {
return ErrInvalidDurableName
}

opts.cfg.Durable = name
opts.cfg.Durable = consumer
return nil
})
}
Expand Down Expand Up @@ -1482,9 +1507,39 @@ func RateLimit(n uint64) SubOpt {
}

// BindStream binds a consumer to a stream explicitly based on a name.
func BindStream(name string) SubOpt {
func BindStream(stream string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.stream = name
if opts.stream != _EMPTY_ && opts.stream != stream {
return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream)
}

opts.stream = stream
return nil
})
}

// Bind binds a subscription to an existing consumer from a stream without attempting to create.
// The first argument is the stream name and the second argument will be the consumer name.
func Bind(stream, consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
}

// In case of pull subscribers, the durable name is a required parameter
// so check that they are not different.
if opts.cfg.Durable != _EMPTY_ && opts.cfg.Durable != consumer {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.cfg.Durable, consumer)
}
if opts.stream != _EMPTY_ && opts.stream != stream {
return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream)
}
opts.stream = stream
opts.consumer = consumer
opts.bound = true
return nil
})
}
Expand Down
3 changes: 3 additions & 0 deletions nats.go
Expand Up @@ -146,9 +146,12 @@ var (
ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response")
ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
ErrStreamNameRequired = errors.New("nats: stream name is required")
ErrConsumerNameRequired = errors.New("nats: consumer name is required")
ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required")
ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer")
ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer")
)

func init() {
Expand Down