diff --git a/js.go b/js.go index 7b70bb1e1..d12db838e 100644 --- a/js.go +++ b/js.go @@ -1017,15 +1017,16 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } var ( - err error - shouldCreate bool - ccfg *ConsumerConfig - info *ConsumerInfo - deliver string - attached bool - stream = o.stream - consumer = o.consumer - isDurable = o.cfg.Durable != _EMPTY_ + err error + shouldCreate bool + ccfg *ConsumerConfig + info *ConsumerInfo + deliver string + attached bool + stream = o.stream + consumer = o.consumer + isDurable = o.cfg.Durable != _EMPTY_ + consumerBound = o.cbound ) // Find the stream mapped to the subject if not bound to a stream already. @@ -1044,9 +1045,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync if consumer != _EMPTY_ { // Only create in case there is no consumer already. info, err = js.ConsumerInfo(stream, consumer) - if err != nil && err.Error() != "nats: consumer not found" { + if err != nil { + if consumerBound && strings.Contains(err.Error(), "consumer not found") { + return nil, err + } return nil, err } + } else if consumerBound { + return nil, ErrDurableNameRequired } if info != nil { @@ -1351,6 +1357,8 @@ type subOpts struct { mack bool // For creating or updating. cfg *ConsumerConfig + // For binding a subscription to a consumer without creating it. + cbound bool } // ManualAck disables auto ack functionality for async subscriptions. @@ -1489,6 +1497,14 @@ func BindStream(name string) SubOpt { }) } +// BindConsumer binds a subscription to a consumer based on a durable name. +func BindConsumer() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cbound = true + return nil + }) +} + // EnableFlowControl enables flow control for a push based consumer. func EnableFlowControl() SubOpt { return subOptFn(func(opts *subOpts) error { diff --git a/nats.go b/nats.go index 88f528109..9f6d2d4b4 100644 --- a/nats.go +++ b/nats.go @@ -149,6 +149,7 @@ var ( ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required") ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required") ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required") + ErrDurableNameRequired = errors.New("nats: durable name of consumer is required") ) func init() { diff --git a/test/js_test.go b/test/js_test.go index 6ca512ff1..7191a5867 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2215,12 +2215,17 @@ func TestJetStreamImportDirectOnly(t *testing.T) { # For the stream publish. { service: "ORDERS" } # For the pull based consumer. Response type needed for batchsize > 1 + { service: "$JS.API.CONSUMER.INFO.ORDERS.d1", response: stream } { service: "$JS.API.CONSUMER.MSG.NEXT.ORDERS.d1", response: stream } # For the push based consumer delivery and ack. { stream: "p.d" } { stream: "p.d3" } # For the acks. Service in case we want an ack to our ack. { service: "$JS.ACK.ORDERS.*.>" } + + # Allow lookup of stream to be able to bind from another account. + { service: "$JS.API.CONSUMER.INFO.ORDERS.d4", response: stream } + { stream: "p.d4" } ] }, U: { @@ -2228,9 +2233,12 @@ func TestJetStreamImportDirectOnly(t *testing.T) { imports [ { service: { subject: "$JS.API.INFO", account: JS } } { service: { subject: "ORDERS", account: JS } , to: "orders" } + { service: { subject: "$JS.API.CONSUMER.INFO.ORDERS.d1", account: JS } } + { service: { subject: "$JS.API.CONSUMER.INFO.ORDERS.d4", account: JS } } { service: { subject: "$JS.API.CONSUMER.MSG.NEXT.ORDERS.d1", account: JS } } { stream: { subject: "p.d", account: JS } } { stream: { subject: "p.d3", account: JS } } + { stream: { subject: "p.d4", account: JS } } { service: { subject: "$JS.ACK.ORDERS.*.>", account: JS } } ] }, @@ -2288,6 +2296,15 @@ func TestJetStreamImportDirectOnly(t *testing.T) { t.Fatalf("push consumer create failed: %v", err) } + _, err = jsm.AddConsumer("ORDERS", &nats.ConsumerConfig{ + Durable: "d4", + AckPolicy: nats.AckExplicitPolicy, + DeliverSubject: "p.d4", + }) + if err != nil { + t.Fatalf("push consumer create failed: %v", err) + } + nc, err := nats.Connect(s.ClientURL()) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -2343,15 +2360,31 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } } - // Cannot subscribe with JS context from another account right now. - if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrJetStreamNotEnabled { + // Cannot create ephemeral subscriber with JS context. + if _, err := js.SubscribeSync("ORDERS", nats.BindConsumer()); err != nats.ErrJetStreamNotEnabled { t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) } - if _, err = js.SubscribeSync("ORDERS", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled { - t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) + + // Can attach to the consumer from another JS account if there is a durable name. + sub, err = js.SubscribeSync("ORDERS", nats.Durable("d4"), nats.BindStream("ORDERS"), nats.BindConsumer()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) } - if _, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled { - t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) + waitForPending(t, toSend) + + // If there are permissions to check that a consumer exists, then possible to bind subscription to it. + sub, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")) + if err != nil { + t.Fatal(err) + } + expected := 10 + msgs, err := sub.Fetch(expected) + if err != nil { + t.Fatal(err) + } + got := len(msgs) + if got != expected { + t.Fatalf("Expected %d, got %d", expected, got) } } @@ -5899,3 +5932,72 @@ func TestJetStreamPublishAsyncPerf(t *testing.T) { fmt.Printf("Took %v to send %d msgs\n", tt, toSend) fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds()) } + +func TestJetStreamBindConsumer(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.AddStream(nil); err == nil { + t.Fatalf("Unexpected success") + } + si, err := js.AddStream(&nats.StreamConfig{Name: "foo"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si == nil || si.Config.Name != "foo" { + t.Fatalf("StreamInfo is not correct %+v", si) + } + + for i := 0; i < 25; i++ { + js.Publish("foo", []byte("hi")) + } + + // Durable name required to bind a consumer. + _, err = js.SubscribeSync("foo", nats.BindConsumer()) + if err != nats.ErrDurableNameRequired { + t.Fatalf("Unexpected error: %v", err) + } + _, err = js.SubscribeSync("foo", nats.Durable("push"), nats.BindConsumer()) + if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") { + t.Fatalf("Unexpected error: %v", err) + } + _, err = js.PullSubscribe("foo", "pull", nats.BindConsumer()) + if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") { + t.Fatalf("Unexpected error: %v", err) + } + + // Push consumer + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "push", AckPolicy: nats.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + _, err = js.SubscribeSync("foo", nats.Durable("push"), nats.BindConsumer()) + if err != nil { + t.Fatal(err) + } + + // Pull consumer + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "pull", AckPolicy: nats.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + _, err = js.PullSubscribe("foo", "pull", nats.BindConsumer()) + if err != nil { + t.Fatal(err) + } +}