diff --git a/js.go b/js.go index 7b70bb1e1..6c8fa7fb4 100644 --- a/js.go +++ b/js.go @@ -147,6 +147,9 @@ type js struct { stc chan struct{} dch chan struct{} rr *rand.Rand + + // Assumes underlying objects exist and prevents additional lookups + bound bool } type jsOpts struct { @@ -166,8 +169,7 @@ const ( defaultAccountCheck = 20 * time.Second ) -// JetStream returns a JetStreamContext for messaging and stream management. -func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { +func (nc *Conn) jetStream(opts ...JSOpt) (*js, error) { js := &js{ nc: nc, opts: &jsOpts{ @@ -181,9 +183,30 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { return nil, err } } + return js, nil +} + +// BindJetStream returns a JetStreamContext for messaging and stream management that will NOT create +// underlying objects like stream or durable on Subscribe. It assumes they exist already and that +// the JetStream context is bound to another account via an import with limited permissions for example. +// This will also disable the use of ephemeral consumer. +func (nc *Conn) BindJetStream(opts ...JSOpt) (JetStreamContext, error) { + js, err := nc.jetStream(opts...) + if err != nil { + return nil, err + } + js.bound = true + return js, nil +} +// JetStream returns a JetStreamContext for messaging and stream management. +func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { + js, err := nc.jetStream(opts...) + if err != nil { + return nil, err + } // If we have check recently we can avoid another account lookup here. - // We want these to be lighweight and created at will. + // We want these to be lightweight and created at will. nc.mu.Lock() now := time.Now() checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck @@ -1030,6 +1053,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // Find the stream mapped to the subject if not bound to a stream already. if o.stream == _EMPTY_ { + if js.bound { + return nil, ErrBoundJetStreamStream + } stream, err = js.lookupStreamBySubject(subj) if err != nil { return nil, err @@ -1041,7 +1067,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // With an explicit durable name, then can lookup // the consumer to which it should be attaching to. consumer = o.cfg.Durable - if consumer != _EMPTY_ { + if consumer == _EMPTY_ && js.bound { + return nil, ErrBoundJetStreamDurable + } else if consumer != _EMPTY_ && !js.bound { // Only create in case there is no consumer already. info, err = js.ConsumerInfo(stream, consumer) if err != nil && err.Error() != "nats: consumer not found" { @@ -1049,7 +1077,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } } - if info != nil { + if js.bound { + attached = true + deliver = subj + } else if info != nil { // Attach using the found consumer config. ccfg = &info.Config attached = true diff --git a/nats.go b/nats.go index 88f528109..20e6d755e 100644 --- a/nats.go +++ b/nats.go @@ -135,6 +135,8 @@ var ( ErrNoContextOrTimeout = errors.New("nats: no context or timeout given") ErrPullModeNotAllowed = errors.New("nats: pull based not supported") ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled") + ErrBoundJetStreamStream = errors.New("nats: a bound JS requires a stream name") + ErrBoundJetStreamDurable = errors.New("nats: a bound JS requires a durable name") ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid") ErrNoStreamResponse = errors.New("nats: no response from stream") ErrNotJSMessage = errors.New("nats: not a jetstream message") diff --git a/test/js_test.go b/test/js_test.go index 625ac81c6..87c20210a 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2252,7 +2252,7 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } defer ncm.Close() - jsm, err := ncm.JetStream() + jsm, err := ncm.BindJetStream() if err != nil { t.Fatal(err) } @@ -2288,13 +2288,21 @@ func TestJetStreamImportDirectOnly(t *testing.T) { t.Fatalf("push consumer create failed: %v", err) } + _, err = jsm.AddConsumer("ORDERS", &nats.ConsumerConfig{ + Durable: "d4", + AckPolicy: nats.AckExplicitPolicy, + }) + if err != nil { + t.Fatalf("pull consumer create failed: %v", err) + } + nc, err := nats.Connect(s.ClientURL()) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer nc.Close() - js, err := nc.JetStream() + js, err := nc.BindJetStream() if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -2343,15 +2351,21 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } } - // Cannot subscribe with JS context from another account right now. - if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrJetStreamNotEnabled { - t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) + // Cannot subscribe with JS context from another account unless the stream is specified + if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrBoundJetStreamStream { + t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrBoundJetStreamStream, err) } - if _, err = js.SubscribeSync("ORDERS", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled { - t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) + if _, err = js.SubscribeSync("ORDERS", nats.BindStream("ORDERS")); err != nats.ErrBoundJetStreamDurable { + t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrBoundJetStreamDurable, err) } - if _, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled { - t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) + if sub, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nil { + t.Fatalf("Expected no error, got '%v'", err) + } else if m, err := sub.Fetch(1); err != nil { + t.Fatalf("expected no error, got %v", err) + } else if len(m) != 1 { + t.Fatalf("expected one message, got %d", len(m)) + } else if err = m[0].AckSync(); err != nil { + t.Fatalf("expected no error, got %v", err) } }