diff --git a/js.go b/js.go index 89d5b337f..a0cc96467 100644 --- a/js.go +++ b/js.go @@ -147,6 +147,9 @@ type js struct { stc chan struct{} dch chan struct{} rr *rand.Rand + + // Assumes underlying objects exist and prevents additional lookups + bound bool } type jsOpts struct { @@ -166,8 +169,9 @@ const ( defaultAccountCheck = 20 * time.Second ) -// JetStream returns a JetStreamContext for messaging and stream management. -func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { +// BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create +// underlying objects like stream or durable. It assumes they exist already. +func (nc *Conn) jetStream(opts ...JSOpt) (*js, error) { js := &js{ nc: nc, opts: &jsOpts{ @@ -181,26 +185,42 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { return nil, err } } + return js, nil +} +// BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create +// underlying objects like stream or durable. It assumes they exist already. +func (nc *Conn) BindJetStream(opts ...JSOpt) (JetStreamContext, error) { + js, err := nc.jetStream(opts...) + if err != nil { + return nil, err + } + js.bound = true + return js, nil +} + +// JetStream returns a JetStreamContext for messaging and stream management. +func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { + js, err := nc.jetStream(opts...) + if err != nil { + return nil, err + } // If we have check recently we can avoid another account lookup here. // We want these to be lightweight and created at will. - // The account check is permanently disabled when the api prefix is set to anything but default - if js.opts.pre == defaultAPIPrefix { - nc.mu.Lock() - now := time.Now() - checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck - if checkAccount { - nc.jsLastCheck = now - } - nc.mu.Unlock() + nc.mu.Lock() + now := time.Now() + checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck + if checkAccount { + nc.jsLastCheck = now + } + nc.mu.Unlock() - if checkAccount { - if _, err := js.AccountInfo(); err != nil { - if err == ErrNoResponders { - err = ErrJetStreamNotEnabled - } - return nil, err + if checkAccount { + if _, err := js.AccountInfo(); err != nil { + if err == ErrNoResponders { + err = ErrJetStreamNotEnabled } + return nil, err } } @@ -1033,6 +1053,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // Find the stream mapped to the subject if not bound to a stream already. if o.stream == _EMPTY_ { + if js.bound { + return nil, fmt.Errorf("nats: a bound JS requires a stream name") + } stream, err = js.lookupStreamBySubject(subj) if err != nil { return nil, err @@ -1044,9 +1067,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // With an explicit durable name, then can lookup // the consumer to which it should be attaching to. consumer = o.cfg.Durable - if consumer == _EMPTY_ && o.noDurCreate { - return nil, fmt.Errorf("nats: NoCreate requires a durable name") - } else if consumer != _EMPTY_ && !o.noDurCreate { + if consumer == _EMPTY_ && js.bound { + return nil, fmt.Errorf("nats: a bound JS requires a durable name") + } else if consumer != _EMPTY_ && !js.bound { // Only create in case there is no consumer already. info, err = js.ConsumerInfo(stream, consumer) if err != nil && err.Error() != "nats: consumer not found" { @@ -1054,7 +1077,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } } - if o.noDurCreate { + if js.bound { attached = true deliver = subj } else if info != nil { @@ -1359,8 +1382,6 @@ type subOpts struct { mack bool // For creating or updating. cfg *ConsumerConfig - // Assert consumer exists and do not create it - noDurCreate bool } // ManualAck disables auto ack functionality for async subscriptions. @@ -1386,15 +1407,6 @@ func Durable(name string) SubOpt { }) } -// NoDurableCreate instructs the API to not create a durable. -// If no durable name was provided, an error will be raised. -func NoDurableCreate() SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.noDurCreate = true - return nil - }) -} - // DeliverAll will configure a Consumer to receive all the // messages from a Stream. func DeliverAll() SubOpt {