diff --git a/js.go b/js.go index 0218126b8..92b6e5449 100644 --- a/js.go +++ b/js.go @@ -1010,6 +1010,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } } + // If no stream name is specified, or if option SubjectIsDelivery is + // specified, the subject cannot be empty. + if subj == _EMPTY_ && (o.stream == _EMPTY_ || o.subjIsDelivery) { + return nil, fmt.Errorf("nats: subject required") + } + // Note that these may change based on the consumer info response we may get. hasHeartbeats := o.cfg.Heartbeat > 0 hasFC := o.cfg.FlowControl @@ -1021,7 +1027,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) } // No deliver subject should be provided - if o.cfg.DeliverSubject != _EMPTY_ || o.boundSubject != _EMPTY_ { + if o.cfg.DeliverSubject != _EMPTY_ || o.subjIsDelivery { return nil, ErrPullSubscribeToPushConsumer } } @@ -1100,10 +1106,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, hbi = o.cfg.Heartbeat } - // With a bound subject, we go directly create the NATS subscription + // With this option, we go directly create the NATS subscription // and skip all lookup/create. - if o.boundSubject != _EMPTY_ { - deliver = o.boundSubject + if o.subjIsDelivery { + deliver = subj } else { // In case a consumer has not been set explicitly, then the // durable name will be used as the consumer name. @@ -1665,10 +1671,10 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool - // For specifying simply the subject the NATS susbcription should - // be created on. No stream or consumer name lookup/creation will - // be done. - boundSubject string + // Means that the subject passed to subscribe call will be used + // for the low level NATS subscription and no stream nor consumer + // lookup/creation will be done. + subjIsDelivery bool } // OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. @@ -1887,16 +1893,17 @@ func DeliverSubject(subject string) SubOpt { }) } -// BindDeliverSubject specifies the deliver subject of a JetStream consumer -// that the subscription should subscribe to. +// SubjectIsDelivery specifies that the subject parameter in the subscribe +// call shall be used to create the NATS subscription and matches the +// JetStream consumer's deliver subject. // // NOTE: This is an "expert" API and should only be used when consumer lookup or // creation by the library is not possible (for instance cross accounts). // Since no lookup of the JetStream consumer is done, there is no way for // the library to check the validity of this JetStream subscription. -func BindDeliverSubject(subject string) SubOpt { +func SubjectIsDelivery() SubOpt { return subOptFn(func(opts *subOpts) error { - opts.boundSubject = subject + opts.subjIsDelivery = true return nil }) } diff --git a/test/js_test.go b/test/js_test.go index 24109ab95..619dca410 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -314,6 +314,17 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("stream lookup failed: %v", err) } + // If stream name is not specified, then the subject is required. + if _, err := js.SubscribeSync(""); err == nil || !strings.Contains(err.Error(), "required") { + t.Fatalf("Unexpected error: %v", err) + } + // Check that if stream name is present, then technically the subject does not have to. + sub, err := js.SubscribeSync("", nats.BindStream("TEST")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() + // Check that Queue subscribe with HB or FC fails. _, err = js.QueueSubscribeSync("foo", "wq", nats.IdleHeartbeat(time.Second)) if err == nil || !strings.Contains(err.Error(), "heartbeat") { @@ -424,7 +435,7 @@ func TestJetStreamSubscribe(t *testing.T) { // Now create a sync subscriber that is durable. dname := "derek" - sub, err := js.SubscribeSync("foo", nats.Durable(dname)) + sub, err = js.SubscribeSync("foo", nats.Durable(dname)) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -545,11 +556,22 @@ func TestJetStreamSubscribe(t *testing.T) { if err != nats.ErrPullSubscribeToPushConsumer { t.Fatalf("Unexpected error: %v", err) } - // Can't specify BindDeliverSubject - _, err = js.PullSubscribe("bar", "foo", nats.BindDeliverSubject("baz")) + // Can't specify SubjectIsDelivery() for pull subscribers + _, err = js.PullSubscribe("bar", "foo", nats.SubjectIsDelivery()) if err != nats.ErrPullSubscribeToPushConsumer { t.Fatalf("Unexpected error: %v", err) } + // If stream name is not specified, need the subject. + _, err = js.PullSubscribe("", "rip") + if err == nil || !strings.Contains(err.Error(), "required") { + t.Fatalf("Unexpected error: %v", err) + } + // If stream provided, it should be ok. + sub, err = js.PullSubscribe("", "rip", nats.BindStream("TEST")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sub.Unsubscribe() batch := 5 sub, err = js.PullSubscribe("bar", "rip") @@ -1971,9 +1993,10 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } waitForPending(t, toSend) - // It is also possible to create a subscription with a BindDeliverSubject() API - // that will not try to do lookup nor create a JS consumer object. - sub, err = js.SubscribeSync("ignored", nats.BindDeliverSubject("p.d4")) + // It is also possible to create a subscription with a SubjectIsDelivery() + // option that says that the given subject will be used to create the low + // level NATS subscription and no lookup/create attempt will be made. + sub, err = js.SubscribeSync("p.d4", nats.SubjectIsDelivery()) if err != nil { t.Fatalf("Unexpected error: %v", err) }