Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[added] option to not create durable on subscribe #734

Closed
wants to merge 9 commits into from
Closed
40 changes: 35 additions & 5 deletions js.go
Expand Up @@ -147,6 +147,9 @@ type js struct {
stc chan struct{}
dch chan struct{}
rr *rand.Rand

// Assumes underlying objects exist and prevents additional lookups
bound bool
}

type jsOpts struct {
Expand All @@ -166,8 +169,7 @@ const (
defaultAccountCheck = 20 * time.Second
)

// JetStream returns a JetStreamContext for messaging and stream management.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
func (nc *Conn) jetStream(opts ...JSOpt) (*js, error) {
js := &js{
nc: nc,
opts: &jsOpts{
Expand All @@ -181,9 +183,29 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
return nil, err
}
}
return js, nil
}

// BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create
// underlying objects like stream or durable. It assumes they exist already.
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
// This will also disable the use of ephemeral consumer.
func (nc *Conn) BindJetStream(opts ...JSOpt) (JetStreamContext, error) {
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
js, err := nc.jetStream(opts...)
if err != nil {
return nil, err
}
js.bound = true
return js, nil
}

// JetStream returns a JetStreamContext for messaging and stream management.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js, err := nc.jetStream(opts...)
if err != nil {
return nil, err
}
// If we have check recently we can avoid another account lookup here.
// We want these to be lighweight and created at will.
// We want these to be lightweight and created at will.
nc.mu.Lock()
now := time.Now()
checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
Expand Down Expand Up @@ -1030,6 +1052,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync

// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
if js.bound {
return nil, ErrBoundJetStreamStream
}
stream, err = js.lookupStreamBySubject(subj)
if err != nil {
return nil, err
Expand All @@ -1041,15 +1066,20 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
// With an explicit durable name, then can lookup
// the consumer to which it should be attaching to.
consumer = o.cfg.Durable
if consumer != _EMPTY_ {
if consumer == _EMPTY_ && js.bound {
return nil, ErrBoundJetStreamDurable
} else if consumer != _EMPTY_ && !js.bound {
// Only create in case there is no consumer already.
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != "nats: consumer not found" {
return nil, err
}
}

if info != nil {
if js.bound {
attached = true
deliver = subj
} else if info != nil {
// Attach using the found consumer config.
ccfg = &info.Config
attached = true
Expand Down
2 changes: 2 additions & 0 deletions nats.go
Expand Up @@ -135,6 +135,8 @@ var (
ErrNoContextOrTimeout = errors.New("nats: no context or timeout given")
ErrPullModeNotAllowed = errors.New("nats: pull based not supported")
ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled")
ErrBoundJetStreamStream = errors.New("nats: a bound JS requires a stream name")
ErrBoundJetStreamDurable = errors.New("nats: a bound JS requires a durable name")
ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid")
ErrNoStreamResponse = errors.New("nats: no response from stream")
ErrNotJSMessage = errors.New("nats: not a jetstream message")
Expand Down
32 changes: 23 additions & 9 deletions test/js_test.go
Expand Up @@ -2252,7 +2252,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
}
defer ncm.Close()

jsm, err := ncm.JetStream()
jsm, err := ncm.BindJetStream()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -2288,13 +2288,21 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
t.Fatalf("push consumer create failed: %v", err)
}

_, err = jsm.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "d4",
AckPolicy: nats.AckExplicitPolicy,
})
if err != nil {
t.Fatalf("pull consumer create failed: %v", err)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
js, err := nc.BindJetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -2343,15 +2351,21 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
}
}

// Cannot subscribe with JS context from another account right now.
if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err)
// Cannot subscribe with JS context from another account unless the stream is specified
if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrBoundJetStreamStream {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrBoundJetStreamStream, err)
}
if _, err = js.SubscribeSync("ORDERS", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err)
if _, err = js.SubscribeSync("ORDERS", nats.BindStream("ORDERS")); err != nats.ErrBoundJetStreamDurable {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrBoundJetStreamDurable, err)
}
if _, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err)
if sub, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nil {
t.Fatalf("Expected no error, got '%v'", err)
} else if m, err := sub.Fetch(1); err != nil {
t.Fatalf("expected no error, got %v", err)
} else if len(m) != 1 {
t.Fatalf("expected one message, got %d", len(m))
} else if err = m[0].Ack(); err != nil {
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("expected no error, got %v", err)
}
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down