Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[added] option to not create durable on subscribe #734

Closed
wants to merge 9 commits into from
Closed
41 changes: 36 additions & 5 deletions js.go
Expand Up @@ -147,6 +147,9 @@ type js struct {
stc chan struct{}
dch chan struct{}
rr *rand.Rand

// Assumes underlying objects exist and prevents additional lookups
bound bool
}

type jsOpts struct {
Expand All @@ -166,8 +169,9 @@ const (
defaultAccountCheck = 20 * time.Second
)

// JetStream returns a JetStreamContext for messaging and stream management.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
// BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create
// underlying objects like stream or durable. It assumes they exist already.
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
func (nc *Conn) jetStream(opts ...JSOpt) (*js, error) {
js := &js{
nc: nc,
opts: &jsOpts{
Expand All @@ -181,9 +185,28 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
return nil, err
}
}
return js, nil
}

// BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create
// underlying objects like stream or durable. It assumes they exist already.
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
func (nc *Conn) BindJetStream(opts ...JSOpt) (JetStreamContext, error) {
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
js, err := nc.jetStream(opts...)
if err != nil {
return nil, err
}
js.bound = true
return js, nil
}

// JetStream returns a JetStreamContext for messaging and stream management.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js, err := nc.jetStream(opts...)
if err != nil {
return nil, err
}
// If we have check recently we can avoid another account lookup here.
// We want these to be lighweight and created at will.
// We want these to be lightweight and created at will.
nc.mu.Lock()
now := time.Now()
checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
Expand Down Expand Up @@ -1030,6 +1053,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync

// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
if js.bound {
return nil, fmt.Errorf("nats: a bound JS requires a stream name")
}
stream, err = js.lookupStreamBySubject(subj)
if err != nil {
return nil, err
Expand All @@ -1041,15 +1067,20 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
// With an explicit durable name, then can lookup
// the consumer to which it should be attaching to.
consumer = o.cfg.Durable
if consumer != _EMPTY_ {
if consumer == _EMPTY_ && js.bound {
return nil, fmt.Errorf("nats: a bound JS requires a durable name")
} else if consumer != _EMPTY_ && !js.bound {
// Only create in case there is no consumer already.
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != "nats: consumer not found" {
return nil, err
}
}

if info != nil {
if js.bound {
attached = true
deliver = subj
} else if info != nil {
// Attach using the found consumer config.
ccfg = &info.Config
attached = true
Expand Down
2 changes: 1 addition & 1 deletion test/js_test.go
Expand Up @@ -2252,7 +2252,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
}
defer ncm.Close()

jsm, err := ncm.JetStream()
jsm, err := ncm.BindJetStream()
if err != nil {
t.Fatal(err)
}
Expand Down