Skip to content

Commit

Permalink
Replaced BindDeliverSubject with SubjectIsDelivery
Browse files Browse the repository at this point in the history
As discussed with Matthias who came up with the idea, this is
better because then we make use of the provided subject. Otherwise
it was looking weird to have something which meaning was:
```
js.SubscribeSync("ignored", nats.BindDeliverSubject("p.d4"))
```
Instead you would now have:
```
sub, err = js.SubscribeSync("p.d4", nats.SubjectIsDelivery())
```

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Aug 17, 2021
1 parent e077154 commit 4b8ebba
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 18 deletions.
31 changes: 19 additions & 12 deletions js.go
Expand Up @@ -1010,6 +1010,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
}

// If no stream name is specified, or if option SubjectIsDelivery is
// specified, the subject cannot be empty.
if subj == _EMPTY_ && (o.stream == _EMPTY_ || o.subjIsDelivery) {
return nil, fmt.Errorf("nats: subject required")
}

// Note that these may change based on the consumer info response we may get.
hasHeartbeats := o.cfg.Heartbeat > 0
hasFC := o.cfg.FlowControl
Expand All @@ -1021,7 +1027,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
}
// No deliver subject should be provided
if o.cfg.DeliverSubject != _EMPTY_ || o.boundSubject != _EMPTY_ {
if o.cfg.DeliverSubject != _EMPTY_ || o.subjIsDelivery {
return nil, ErrPullSubscribeToPushConsumer
}
}
Expand Down Expand Up @@ -1100,10 +1106,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
hbi = o.cfg.Heartbeat
}

// With a bound subject, we go directly create the NATS subscription
// With this option, we go directly create the NATS subscription
// and skip all lookup/create.
if o.boundSubject != _EMPTY_ {
deliver = o.boundSubject
if o.subjIsDelivery {
deliver = subj
} else {
// In case a consumer has not been set explicitly, then the
// durable name will be used as the consumer name.
Expand Down Expand Up @@ -1665,10 +1671,10 @@ type subOpts struct {
mack bool
// For an ordered consumer.
ordered bool
// For specifying simply the subject the NATS susbcription should
// be created on. No stream or consumer name lookup/creation will
// be done.
boundSubject string
// Means that the subject passed to subscribe call will be used
// for the low level NATS subscription and no stream nor consumer
// lookup/creation will be done.
subjIsDelivery bool
}

// OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages.
Expand Down Expand Up @@ -1887,16 +1893,17 @@ func DeliverSubject(subject string) SubOpt {
})
}

// BindDeliverSubject specifies the deliver subject of a JetStream consumer
// that the subscription should subscribe to.
// SubjectIsDelivery specifies that the subject parameter in the subscribe
// call shall be used to create the NATS subscription and matches the
// JetStream consumer's deliver subject.
//
// NOTE: This is an "expert" API and should only be used when consumer lookup or
// creation by the library is not possible (for instance cross accounts).
// Since no lookup of the JetStream consumer is done, there is no way for
// the library to check the validity of this JetStream subscription.
func BindDeliverSubject(subject string) SubOpt {
func SubjectIsDelivery() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.boundSubject = subject
opts.subjIsDelivery = true
return nil
})
}
Expand Down
35 changes: 29 additions & 6 deletions test/js_test.go
Expand Up @@ -314,6 +314,17 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Fatalf("stream lookup failed: %v", err)
}

// If stream name is not specified, then the subject is required.
if _, err := js.SubscribeSync(""); err == nil || !strings.Contains(err.Error(), "required") {
t.Fatalf("Unexpected error: %v", err)
}
// Check that if stream name is present, then technically the subject does not have to.
sub, err := js.SubscribeSync("", nats.BindStream("TEST"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub.Unsubscribe()

// Check that Queue subscribe with HB or FC fails.
_, err = js.QueueSubscribeSync("foo", "wq", nats.IdleHeartbeat(time.Second))
if err == nil || !strings.Contains(err.Error(), "heartbeat") {
Expand Down Expand Up @@ -424,7 +435,7 @@ func TestJetStreamSubscribe(t *testing.T) {

// Now create a sync subscriber that is durable.
dname := "derek"
sub, err := js.SubscribeSync("foo", nats.Durable(dname))
sub, err = js.SubscribeSync("foo", nats.Durable(dname))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -545,11 +556,22 @@ func TestJetStreamSubscribe(t *testing.T) {
if err != nats.ErrPullSubscribeToPushConsumer {
t.Fatalf("Unexpected error: %v", err)
}
// Can't specify BindDeliverSubject
_, err = js.PullSubscribe("bar", "foo", nats.BindDeliverSubject("baz"))
// Can't specify SubjectIsDelivery() for pull subscribers
_, err = js.PullSubscribe("bar", "foo", nats.SubjectIsDelivery())
if err != nats.ErrPullSubscribeToPushConsumer {
t.Fatalf("Unexpected error: %v", err)
}
// If stream name is not specified, need the subject.
_, err = js.PullSubscribe("", "rip")
if err == nil || !strings.Contains(err.Error(), "required") {
t.Fatalf("Unexpected error: %v", err)
}
// If stream provided, it should be ok.
sub, err = js.PullSubscribe("", "rip", nats.BindStream("TEST"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub.Unsubscribe()

batch := 5
sub, err = js.PullSubscribe("bar", "rip")
Expand Down Expand Up @@ -1971,9 +1993,10 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
}
waitForPending(t, toSend)

// It is also possible to create a subscription with a BindDeliverSubject() API
// that will not try to do lookup nor create a JS consumer object.
sub, err = js.SubscribeSync("ignored", nats.BindDeliverSubject("p.d4"))
// It is also possible to create a subscription with a SubjectIsDelivery()
// option that says that the given subject will be used to create the low
// level NATS subscription and no lookup/create attempt will be made.
sub, err = js.SubscribeSync("p.d4", nats.SubjectIsDelivery())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down

0 comments on commit 4b8ebba

Please sign in to comment.