Skip to content

Commit

Permalink
[added] option to not create durable on subscribe
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Hanel <mh@synadia.com>
  • Loading branch information
matthiashanel committed May 24, 2021
1 parent 3b1f6fc commit 5e89fb4
Showing 1 changed file with 34 additions and 15 deletions.
49 changes: 34 additions & 15 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,21 +183,24 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
}

// If we have check recently we can avoid another account lookup here.
// We want these to be lighweight and created at will.
nc.mu.Lock()
now := time.Now()
checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
if checkAccount {
nc.jsLastCheck = now
}
nc.mu.Unlock()
// We want these to be lightweight and created at will.
// The account check is permanently disabled when the api prefix is set to anything but default
if js.opts.pre == defaultAPIPrefix {
nc.mu.Lock()
now := time.Now()
checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
if checkAccount {
nc.jsLastCheck = now
}
nc.mu.Unlock()

if checkAccount {
if _, err := js.AccountInfo(); err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
if checkAccount {
if _, err := js.AccountInfo(); err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
return nil, err
}
return nil, err
}
}

Expand Down Expand Up @@ -1041,15 +1044,20 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
// With an explicit durable name, then can lookup
// the consumer to which it should be attaching to.
consumer = o.cfg.Durable
if consumer != _EMPTY_ {
if consumer == _EMPTY_ && o.noDurCreate {
return nil, fmt.Errorf("nats: NoCreate requires a durable name")
} else if consumer != _EMPTY_ && !o.noDurCreate {
// Only create in case there is no consumer already.
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != "nats: consumer not found" {
return nil, err
}
}

if info != nil {
if o.noDurCreate {
attached = true
deliver = subj
} else if info != nil {
// Attach using the found consumer config.
ccfg = &info.Config
attached = true
Expand Down Expand Up @@ -1351,6 +1359,8 @@ type subOpts struct {
mack bool
// For creating or updating.
cfg *ConsumerConfig
// Assert consumer exists and do not create it
noDurCreate bool
}

// ManualAck disables auto ack functionality for async subscriptions.
Expand All @@ -1376,6 +1386,15 @@ func Durable(name string) SubOpt {
})
}

// NoDurableCreate instructs the API to not create a durable.
// If no durable name was provided, an error will be raised.
func NoDurableCreate() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.noDurCreate = true
return nil
})
}

// DeliverAll will configure a Consumer to receive all the
// messages from a Stream.
func DeliverAll() SubOpt {
Expand Down

0 comments on commit 5e89fb4

Please sign in to comment.