Skip to content

Commit

Permalink
Changing to BindJetStream
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Hanel <mh@synadia.com>
  • Loading branch information
matthiashanel committed May 24, 2021
1 parent 5e89fb4 commit d2eaa57
Showing 1 changed file with 44 additions and 32 deletions.
76 changes: 44 additions & 32 deletions js.go
Expand Up @@ -147,6 +147,9 @@ type js struct {
stc chan struct{}
dch chan struct{}
rr *rand.Rand

// Assumes underlying objects exist and prevents additional lookups
bound bool
}

type jsOpts struct {
Expand All @@ -166,8 +169,9 @@ const (
defaultAccountCheck = 20 * time.Second
)

// JetStream returns a JetStreamContext for messaging and stream management.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
// BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create
// underlying objects like stream or durable. It assumes they exist already.
func (nc *Conn) jetStream(opts ...JSOpt) (*js, error) {
js := &js{
nc: nc,
opts: &jsOpts{
Expand All @@ -181,26 +185,42 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
return nil, err
}
}
return js, nil
}

// BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create
// underlying objects like stream or durable. It assumes they exist already.
func (nc *Conn) BindJetStream(opts ...JSOpt) (JetStreamContext, error) {
js, err := nc.jetStream(opts...)
if err != nil {
return nil, err
}
js.bound = true
return js, nil
}

// JetStream returns a JetStreamContext for messaging and stream management.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js, err := nc.jetStream(opts...)
if err != nil {
return nil, err
}
// If we have check recently we can avoid another account lookup here.
// We want these to be lightweight and created at will.
// The account check is permanently disabled when the api prefix is set to anything but default
if js.opts.pre == defaultAPIPrefix {
nc.mu.Lock()
now := time.Now()
checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
if checkAccount {
nc.jsLastCheck = now
}
nc.mu.Unlock()
nc.mu.Lock()
now := time.Now()
checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
if checkAccount {
nc.jsLastCheck = now
}
nc.mu.Unlock()

if checkAccount {
if _, err := js.AccountInfo(); err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
return nil, err
if checkAccount {
if _, err := js.AccountInfo(); err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
return nil, err
}
}

Expand Down Expand Up @@ -1033,6 +1053,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync

// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
if js.bound {
return nil, fmt.Errorf("nats: a bound JS requires a stream name")
}
stream, err = js.lookupStreamBySubject(subj)
if err != nil {
return nil, err
Expand All @@ -1044,17 +1067,17 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
// With an explicit durable name, then can lookup
// the consumer to which it should be attaching to.
consumer = o.cfg.Durable
if consumer == _EMPTY_ && o.noDurCreate {
return nil, fmt.Errorf("nats: NoCreate requires a durable name")
} else if consumer != _EMPTY_ && !o.noDurCreate {
if consumer == _EMPTY_ && js.bound {
return nil, fmt.Errorf("nats: a bound JS requires a durable name")
} else if consumer != _EMPTY_ && !js.bound {
// Only create in case there is no consumer already.
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != "nats: consumer not found" {
return nil, err
}
}

if o.noDurCreate {
if js.bound {
attached = true
deliver = subj
} else if info != nil {
Expand Down Expand Up @@ -1359,8 +1382,6 @@ type subOpts struct {
mack bool
// For creating or updating.
cfg *ConsumerConfig
// Assert consumer exists and do not create it
noDurCreate bool
}

// ManualAck disables auto ack functionality for async subscriptions.
Expand All @@ -1386,15 +1407,6 @@ func Durable(name string) SubOpt {
})
}

// NoDurableCreate instructs the API to not create a durable.
// If no durable name was provided, an error will be raised.
func NoDurableCreate() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.noDurCreate = true
return nil
})
}

// DeliverAll will configure a Consumer to receive all the
// messages from a Stream.
func DeliverAll() SubOpt {
Expand Down

0 comments on commit d2eaa57

Please sign in to comment.