Skip to content

Commit

Permalink
js: Add BindOnly option to Subscribe
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Jun 3, 2021
1 parent da90d22 commit 8d78de2
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 30 deletions.
107 changes: 84 additions & 23 deletions js.go
Expand Up @@ -1008,7 +1008,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}
}

isPullMode := ch == nil && cb == nil
isPullMode := ch == nil && cb == nil && !isSync
badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
hasHeartbeats := o.cfg.Heartbeat > 0
hasFC := o.cfg.FlowControl
Expand All @@ -1017,17 +1017,24 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}

var (
err error
shouldCreate bool
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
err error
shouldCreate bool
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
notFoundErr bool
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
consumerBound = o.bound
)

// In case a consumer has not set explicitly, the durable name will be used as the name.
if consumer == _EMPTY_ {
consumer = o.cfg.Durable
}

// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
stream, err = js.lookupStreamBySubject(subj)
Expand All @@ -1038,18 +1045,16 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
stream = o.stream
}

// With an explicit durable name, then can lookup
// the consumer to which it should be attaching to.
consumer = o.cfg.Durable
// With an explicit durable name, then can lookup the consumer first
// to which it should be attaching to.
if consumer != _EMPTY_ {
// Only create in case there is no consumer already.
// TODO: Only fail with push consumers.
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != "nats: consumer not found" {
return nil, err
}
notFoundErr = err != nil && strings.Contains(err.Error(), "consumer not found")
}

if info != nil {
switch {
case info != nil:
// Attach using the found consumer config.
ccfg = &info.Config
attached = true
Expand All @@ -1059,12 +1064,23 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
return nil, ErrSubjectMismatch
}

// Prevent binding a subscription against incompatible consumer types.
if isPullMode && ccfg.DeliverSubject != _EMPTY_ {
return nil, ErrPullSubscribeToPushConsumer
} else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ {
return nil, ErrPullSubscribeRequired
}

if ccfg.DeliverSubject != _EMPTY_ {
deliver = ccfg.DeliverSubject
} else {
} else if !isPullMode {
deliver = NewInbox()
}
} else {
case (err != nil && !notFoundErr) || (notFoundErr && consumerBound):
// Return error if consumer lookup by name fails and the consumer is being bound.
return nil, err
default:
// Attempt to create consumer if not found or using BindOnly.
shouldCreate = true
deliver = NewInbox()
if !isPullMode {
Expand Down Expand Up @@ -1351,6 +1367,8 @@ type subOpts struct {
mack bool
// For creating or updating.
cfg *ConsumerConfig
// For binding a subscription to a consumer without creating it.
bound bool
}

// ManualAck disables auto ack functionality for async subscriptions.
Expand All @@ -1364,9 +1382,12 @@ func ManualAck() SubOpt {
// Durable defines the consumer name for JetStream durable subscribers.
func Durable(name string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if opts.cfg.Durable != "" {
if opts.cfg.Durable != _EMPTY_ {
return fmt.Errorf("nats: option Durable set more than once")
}
if opts.consumer != _EMPTY_ {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, name)
}
if strings.Contains(name, ".") {
return ErrInvalidDurableName
}
Expand Down Expand Up @@ -1482,9 +1503,49 @@ func RateLimit(n uint64) SubOpt {
}

// BindStream binds a consumer to a stream explicitly based on a name.
func BindStream(name string) SubOpt {
func BindStream(stream string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if opts.stream != "" {
return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream)
}

opts.stream = stream
return nil
})
}

// BindOnly binds a subscription to a consumer without attempting to create.
func BindOnly(params ...string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.stream = name
if len(params) < 1 {
return ErrStreamNameRequired
}
var stream, consumer string
stream = params[0]
if stream == _EMPTY_ {
return ErrStreamNameRequired
}

// In case of pull subscribers, the durable name is a required parameter
// so only needed to use BindOnly with stream name.
if opts.cfg.Durable != _EMPTY_ {
consumer = opts.cfg.Durable
}
if len(params) >= 2 {
consumer = params[1]
if opts.cfg.Durable != _EMPTY_ && opts.cfg.Durable != consumer {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.cfg.Durable, consumer)
}
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
}
if opts.stream != "" {
return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream)
}
}
opts.stream = stream
opts.consumer = consumer
opts.bound = true
return nil
})
}
Expand Down
3 changes: 3 additions & 0 deletions nats.go
Expand Up @@ -146,9 +146,12 @@ var (
ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response")
ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
ErrStreamNameRequired = errors.New("nats: stream name is required")
ErrConsumerNameRequired = errors.New("nats: consumer name is required")
ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required")
ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer")
ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer")
)

func init() {
Expand Down

0 comments on commit 8d78de2

Please sign in to comment.