From 6237e10ee97bf3913f65e4939ac8c3b03bab769b Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 1 Jun 2021 16:27:51 -0700 Subject: [PATCH] js: Add Bind option to Subscribe Signed-off-by: Waldemar Quevedo --- js.go | 107 +++++++++++++++------ nats.go | 3 + test/js_test.go | 240 ++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 316 insertions(+), 34 deletions(-) diff --git a/js.go b/js.go index 7b70bb1e1..b91184dd1 100644 --- a/js.go +++ b/js.go @@ -1008,7 +1008,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } } - isPullMode := ch == nil && cb == nil + isPullMode := ch == nil && cb == nil && !isSync badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy hasHeartbeats := o.cfg.Heartbeat > 0 hasFC := o.cfg.FlowControl @@ -1017,17 +1017,26 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } var ( - err error - shouldCreate bool - ccfg *ConsumerConfig - info *ConsumerInfo - deliver string - attached bool - stream = o.stream - consumer = o.consumer - isDurable = o.cfg.Durable != _EMPTY_ + err error + shouldCreate bool + ccfg *ConsumerConfig + info *ConsumerInfo + deliver string + attached bool + stream = o.stream + consumer = o.consumer + isDurable = o.cfg.Durable != _EMPTY_ + consumerBound = o.bound + notFoundErr bool + lookupErr bool ) + // In case a consumer has not been set explicitly, then the + // durable name will be used as the consumer name. + if consumer == _EMPTY_ { + consumer = o.cfg.Durable + } + // Find the stream mapped to the subject if not bound to a stream already. if o.stream == _EMPTY_ { stream, err = js.lookupStreamBySubject(subj) @@ -1038,18 +1047,16 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync stream = o.stream } - // With an explicit durable name, then can lookup - // the consumer to which it should be attaching to. - consumer = o.cfg.Durable + // With an explicit durable name, then can lookup the consumer first + // to which it should be attaching to. if consumer != _EMPTY_ { - // Only create in case there is no consumer already. info, err = js.ConsumerInfo(stream, consumer) - if err != nil && err.Error() != "nats: consumer not found" { - return nil, err - } + notFoundErr = err != nil && strings.Contains(err.Error(), "consumer not found") + lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded } - if info != nil { + switch { + case info != nil: // Attach using the found consumer config. ccfg = &info.Config attached = true @@ -1059,12 +1066,25 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync return nil, ErrSubjectMismatch } + // Prevent binding a subscription against incompatible consumer types. + if isPullMode && ccfg.DeliverSubject != _EMPTY_ { + return nil, ErrPullSubscribeToPushConsumer + } else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ { + return nil, ErrPullSubscribeRequired + } + if ccfg.DeliverSubject != _EMPTY_ { deliver = ccfg.DeliverSubject - } else { + } else if !isPullMode { deliver = NewInbox() } - } else { + case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): + // If the consumer is being bound got an error on pull subscribe then allow the error. + if !(isPullMode && lookupErr && consumerBound) { + return nil, err + } + default: + // Attempt to create consumer if not found nor using Bind. shouldCreate = true deliver = NewInbox() if !isPullMode { @@ -1351,6 +1371,8 @@ type subOpts struct { mack bool // For creating or updating. cfg *ConsumerConfig + // For binding a subscription to a consumer without creating it. + bound bool } // ManualAck disables auto ack functionality for async subscriptions. @@ -1362,16 +1384,19 @@ func ManualAck() SubOpt { } // Durable defines the consumer name for JetStream durable subscribers. -func Durable(name string) SubOpt { +func Durable(consumer string) SubOpt { return subOptFn(func(opts *subOpts) error { - if opts.cfg.Durable != "" { + if opts.cfg.Durable != _EMPTY_ { return fmt.Errorf("nats: option Durable set more than once") } - if strings.Contains(name, ".") { + if opts.consumer != _EMPTY_ && opts.consumer != consumer { + return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer) + } + if strings.Contains(consumer, ".") { return ErrInvalidDurableName } - opts.cfg.Durable = name + opts.cfg.Durable = consumer return nil }) } @@ -1482,9 +1507,39 @@ func RateLimit(n uint64) SubOpt { } // BindStream binds a consumer to a stream explicitly based on a name. -func BindStream(name string) SubOpt { +func BindStream(stream string) SubOpt { return subOptFn(func(opts *subOpts) error { - opts.stream = name + if opts.stream != _EMPTY_ && opts.stream != stream { + return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream) + } + + opts.stream = stream + return nil + }) +} + +// Bind binds a subscription to an existing consumer from a stream without attempting to create. +// The first argument is the stream name and the second argument will be the consumer name. +func Bind(stream, consumer string) SubOpt { + return subOptFn(func(opts *subOpts) error { + if stream == _EMPTY_ { + return ErrStreamNameRequired + } + if consumer == _EMPTY_ { + return ErrConsumerNameRequired + } + + // In case of pull subscribers, the durable name is a required parameter + // so check that they are not different. + if opts.cfg.Durable != _EMPTY_ && opts.cfg.Durable != consumer { + return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.cfg.Durable, consumer) + } + if opts.stream != _EMPTY_ && opts.stream != stream { + return fmt.Errorf("nats: duplicate stream name (%s and %s)", opts.stream, stream) + } + opts.stream = stream + opts.consumer = consumer + opts.bound = true return nil }) } diff --git a/nats.go b/nats.go index 88f528109..f4302a5f8 100644 --- a/nats.go +++ b/nats.go @@ -146,9 +146,12 @@ var ( ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response") ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") ErrStreamNameRequired = errors.New("nats: stream name is required") + ErrConsumerNameRequired = errors.New("nats: consumer name is required") ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required") ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required") ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required") + ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer") + ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer") ) func init() { diff --git a/test/js_test.go b/test/js_test.go index 6ca512ff1..b1f141d0b 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2215,22 +2215,48 @@ func TestJetStreamImportDirectOnly(t *testing.T) { # For the stream publish. { service: "ORDERS" } # For the pull based consumer. Response type needed for batchsize > 1 + { service: "$JS.API.CONSUMER.INFO.ORDERS.d1", response: stream } { service: "$JS.API.CONSUMER.MSG.NEXT.ORDERS.d1", response: stream } # For the push based consumer delivery and ack. { stream: "p.d" } { stream: "p.d3" } # For the acks. Service in case we want an ack to our ack. { service: "$JS.ACK.ORDERS.*.>" } + + # Allow lookup of stream to be able to bind from another account. + { service: "$JS.API.CONSUMER.INFO.ORDERS.d4", response: stream } + { stream: "p.d4" } ] }, U: { - users: [ {user: rip, password: bar} ] + users: [ { user: rip, password: bar } ] imports [ { service: { subject: "$JS.API.INFO", account: JS } } { service: { subject: "ORDERS", account: JS } , to: "orders" } + # { service: { subject: "$JS.API.CONSUMER.INFO.ORDERS.d1", account: JS } } + { service: { subject: "$JS.API.CONSUMER.INFO.ORDERS.d4", account: JS } } { service: { subject: "$JS.API.CONSUMER.MSG.NEXT.ORDERS.d1", account: JS } } { stream: { subject: "p.d", account: JS } } { stream: { subject: "p.d3", account: JS } } + { stream: { subject: "p.d4", account: JS } } + { service: { subject: "$JS.ACK.ORDERS.*.>", account: JS } } + ] + }, + V: { + users: [ { + user: v, + password: quux, + permissions: { publish: {deny: ["$JS.API.CONSUMER.INFO.ORDERS.d1"]} } + } ] + imports [ + { service: { subject: "$JS.API.INFO", account: JS } } + { service: { subject: "ORDERS", account: JS } , to: "orders" } + { service: { subject: "$JS.API.CONSUMER.INFO.ORDERS.d1", account: JS } } + { service: { subject: "$JS.API.CONSUMER.INFO.ORDERS.d4", account: JS } } + { service: { subject: "$JS.API.CONSUMER.MSG.NEXT.ORDERS.d1", account: JS } } + { stream: { subject: "p.d", account: JS } } + { stream: { subject: "p.d3", account: JS } } + { stream: { subject: "p.d4", account: JS } } { service: { subject: "$JS.ACK.ORDERS.*.>", account: JS } } ] }, @@ -2288,6 +2314,15 @@ func TestJetStreamImportDirectOnly(t *testing.T) { t.Fatalf("push consumer create failed: %v", err) } + _, err = jsm.AddConsumer("ORDERS", &nats.ConsumerConfig{ + Durable: "d4", + AckPolicy: nats.AckExplicitPolicy, + DeliverSubject: "p.d4", + }) + if err != nil { + t.Fatalf("push consumer create failed: %v", err) + } + nc, err := nats.Connect(s.ClientURL()) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -2343,15 +2378,58 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } } - // Cannot subscribe with JS context from another account right now. - if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrJetStreamNotEnabled { - t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) + // Can attach to the consumer from another JS account if there is a durable name. + sub, err = js.SubscribeSync("ORDERS", nats.Durable("d4"), nats.BindStream("ORDERS")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + waitForPending(t, toSend) + + // Bind has the same effect as above since it would not attempt to create and lookup will work. + sub, err = js.SubscribeSync("ORDERS", nats.Bind("ORDERS", "d4")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Even if there are no permissions or import to check that a consumer exists, + // it is still possible to bind subscription to it. + sub, err = js.PullSubscribe("ORDERS", "d1", nats.Bind("ORDERS", "d1")) + if err != nil { + t.Fatal(err) + } + expected := 10 + msgs, err := sub.Fetch(expected) + if err != nil { + t.Fatal(err) + } + got := len(msgs) + if got != expected { + t.Fatalf("Expected %d, got %d", expected, got) + } + + // Account without permissions to lookup should be able to bind as well. + nc, err = nats.Connect(s.ClientURL(), nats.UserInfo("v", "quux")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) } - if _, err = js.SubscribeSync("ORDERS", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled { - t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) + defer nc.Close() + + js, err = nc.JetStream() + if err != nil { + t.Fatal(err) } - if _, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled { - t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err) + sub, err = js.PullSubscribe("ORDERS", "d1", nats.Bind("ORDERS", "d1")) + if err != nil { + t.Fatal(err) + } + expected = 10 + msgs, err = sub.Fetch(expected) + if err != nil { + t.Fatal(err) + } + got = len(msgs) + if got != expected { + t.Fatalf("Expected %d, got %d", expected, got) } } @@ -5899,3 +5977,149 @@ func TestJetStreamPublishAsyncPerf(t *testing.T) { fmt.Printf("Took %v to send %d msgs\n", tt, toSend) fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds()) } + +func TestJetStreamBindConsumer(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if _, err := js.AddStream(nil); err == nil { + t.Fatalf("Unexpected success") + } + si, err := js.AddStream(&nats.StreamConfig{Name: "foo"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si == nil || si.Config.Name != "foo" { + t.Fatalf("StreamInfo is not correct %+v", si) + } + + for i := 0; i < 25; i++ { + js.Publish("foo", []byte("hi")) + } + + // Both stream and consumer names are required for bind only. + _, err = js.SubscribeSync("foo", nats.Bind("", "")) + if err != nats.ErrStreamNameRequired { + t.Fatalf("Unexpected error: %v", err) + } + _, err = js.SubscribeSync("foo", nats.Bind("foo", "")) + if err != nats.ErrConsumerNameRequired { + t.Fatalf("Unexpected error: %v", err) + } + _, err = js.SubscribeSync("foo", nats.Bind("foo", "push")) + if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") { + t.Fatalf("Unexpected error: %v", err) + } + + // Pull consumer + _, err = js.PullSubscribe("foo", "pull", nats.Bind("foo", "pull")) + if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") { + t.Fatalf("Unexpected error: %v", err) + } + + // Push consumer + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "push", + AckPolicy: nats.AckExplicitPolicy, + DeliverSubject: nats.NewInbox(), + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Push Consumer Bind Only + _, err = js.SubscribeSync("foo", nats.Bind("foo", "push")) + if err != nil { + t.Fatal(err) + } + // Ambiguous declaration should not be allowed. + _, err = js.SubscribeSync("foo", nats.Durable("push2"), nats.Bind("foo", "push")) + if err == nil || !strings.Contains(err.Error(), `nats: duplicate consumer names (push2 and push)`) { + t.Fatalf("Unexpected error: %v", err) + } + _, err = js.SubscribeSync("foo", nats.BindStream("foo"), nats.Bind("foo2", "push")) + if err == nil || !strings.Contains(err.Error(), `nats: duplicate stream name (foo and foo2)`) { + t.Fatalf("Unexpected error: %v", err) + } + + // Duplicate stream name is fine. + _, err = js.SubscribeSync("foo", nats.BindStream("foo"), nats.Bind("foo", "push")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Pull consumer + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "pull", + AckPolicy: nats.AckExplicitPolicy, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Pull consumer can bind without create using only the stream name (since durable is required argument). + _, err = js.PullSubscribe("foo", "pull", nats.Bind("foo", "pull")) + if err != nil { + t.Fatal(err) + } + + // Prevent binding to durable that is from a wrong type. + _, err = js.PullSubscribe("foo", "push", nats.Bind("foo", "push")) + if err != nats.ErrPullSubscribeToPushConsumer { + t.Fatalf("Unexpected error: %v", err) + } + _, err = js.SubscribeSync("foo", nats.Bind("foo", "pull")) + if err != nats.ErrPullSubscribeRequired { + t.Fatalf("Unexpected error: %v", err) + } + + // Create ephemeral consumer + sub1, err := js.SubscribeSync("foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + cinfo, err := sub1.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + + // Bind to ephemeral consumer by setting the consumer. + sub2, err := js.SubscribeSync("foo", nats.Bind("foo", cinfo.Name)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sub3, err := nc.SubscribeSync(cinfo.Config.DeliverSubject) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + js.Publish("foo", []byte("hi 1")) + js.Publish("foo", []byte("hi 2")) + js.Publish("foo", []byte("hi 3")) + + _, err = sub1.NextMsg(1 * time.Second) + if err != nil { + t.Fatal(err) + } + _, err = sub2.NextMsg(1 * time.Second) + if err != nil { + t.Fatal(err) + } + _, err = sub3.NextMsg(1 * time.Second) + if err != nil { + t.Fatal(err) + } +}