Skip to content

Commit

Permalink
js: Add BindConsumer option
Browse files Browse the repository at this point in the history
This allows creating subscriptions that are bound to a consumer
that matches the durable name, and does not attempt to create
the consumer in case it is not present.

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Jun 1, 2021
1 parent da90d22 commit e40d303
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 16 deletions.
36 changes: 26 additions & 10 deletions js.go
Expand Up @@ -1017,15 +1017,16 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}

var (
err error
shouldCreate bool
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
err error
shouldCreate bool
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
consumerBound = o.cbound
)

// Find the stream mapped to the subject if not bound to a stream already.
Expand All @@ -1044,9 +1045,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
if consumer != _EMPTY_ {
// Only create in case there is no consumer already.
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != "nats: consumer not found" {
if err != nil {
if consumerBound && strings.Contains(err.Error(), "consumer not found") {
return nil, err
}
return nil, err
}
} else if consumerBound {
return nil, ErrDurableNameRequired
}

if info != nil {
Expand Down Expand Up @@ -1351,6 +1357,8 @@ type subOpts struct {
mack bool
// For creating or updating.
cfg *ConsumerConfig
// For binding a subscription to a consumer without creating it.
cbound bool
}

// ManualAck disables auto ack functionality for async subscriptions.
Expand Down Expand Up @@ -1489,6 +1497,14 @@ func BindStream(name string) SubOpt {
})
}

// BindConsumer binds a subscription to a consumer based on a durable name.
func BindConsumer() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cbound = true
return nil
})
}

// EnableFlowControl enables flow control for a push based consumer.
func EnableFlowControl() SubOpt {
return subOptFn(func(opts *subOpts) error {
Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -149,6 +149,7 @@ var (
ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required")
ErrDurableNameRequired = errors.New("nats: durable name of consumer is required")
)

func init() {
Expand Down
114 changes: 108 additions & 6 deletions test/js_test.go
Expand Up @@ -2215,22 +2215,30 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
# For the stream publish.
{ service: "ORDERS" }
# For the pull based consumer. Response type needed for batchsize > 1
{ service: "$JS.API.CONSUMER.INFO.ORDERS.d1", response: stream }
{ service: "$JS.API.CONSUMER.MSG.NEXT.ORDERS.d1", response: stream }
# For the push based consumer delivery and ack.
{ stream: "p.d" }
{ stream: "p.d3" }
# For the acks. Service in case we want an ack to our ack.
{ service: "$JS.ACK.ORDERS.*.>" }
# Allow lookup of stream to be able to bind from another account.
{ service: "$JS.API.CONSUMER.INFO.ORDERS.d4", response: stream }
{ stream: "p.d4" }
]
},
U: {
users: [ {user: rip, password: bar} ]
imports [
{ service: { subject: "$JS.API.INFO", account: JS } }
{ service: { subject: "ORDERS", account: JS } , to: "orders" }
{ service: { subject: "$JS.API.CONSUMER.INFO.ORDERS.d1", account: JS } }
{ service: { subject: "$JS.API.CONSUMER.INFO.ORDERS.d4", account: JS } }
{ service: { subject: "$JS.API.CONSUMER.MSG.NEXT.ORDERS.d1", account: JS } }
{ stream: { subject: "p.d", account: JS } }
{ stream: { subject: "p.d3", account: JS } }
{ stream: { subject: "p.d4", account: JS } }
{ service: { subject: "$JS.ACK.ORDERS.*.>", account: JS } }
]
},
Expand Down Expand Up @@ -2288,6 +2296,15 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
t.Fatalf("push consumer create failed: %v", err)
}

_, err = jsm.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "d4",
AckPolicy: nats.AckExplicitPolicy,
DeliverSubject: "p.d4",
})
if err != nil {
t.Fatalf("push consumer create failed: %v", err)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down Expand Up @@ -2343,15 +2360,31 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
}
}

// Cannot subscribe with JS context from another account right now.
if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrJetStreamNotEnabled {
// Cannot create ephemeral subscriber with JS context.
if _, err := js.SubscribeSync("ORDERS", nats.BindConsumer()); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err)
}
if _, err = js.SubscribeSync("ORDERS", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err)

// Can attach to the consumer from another JS account if there is a durable name.
sub, err = js.SubscribeSync("ORDERS", nats.Durable("d4"), nats.BindStream("ORDERS"), nats.BindConsumer())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err)
waitForPending(t, toSend)

// If there are permissions to check that a consumer exists, then possible to bind subscription to it.
sub, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS"))
if err != nil {
t.Fatal(err)
}
expected := 10
msgs, err := sub.Fetch(expected)
if err != nil {
t.Fatal(err)
}
got := len(msgs)
if got != expected {
t.Fatalf("Expected %d, got %d", expected, got)
}
}

Expand Down Expand Up @@ -5899,3 +5932,72 @@ func TestJetStreamPublishAsyncPerf(t *testing.T) {
fmt.Printf("Took %v to send %d msgs\n", tt, toSend)
fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds())
}

func TestJetStreamBindConsumer(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if _, err := js.AddStream(nil); err == nil {
t.Fatalf("Unexpected success")
}
si, err := js.AddStream(&nats.StreamConfig{Name: "foo"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", si)
}

for i := 0; i < 25; i++ {
js.Publish("foo", []byte("hi"))
}

// Durable name required to bind a consumer.
_, err = js.SubscribeSync("foo", nats.BindConsumer())
if err != nats.ErrDurableNameRequired {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.SubscribeSync("foo", nats.Durable("push"), nats.BindConsumer())
if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.PullSubscribe("foo", "pull", nats.BindConsumer())
if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") {
t.Fatalf("Unexpected error: %v", err)
}

// Push consumer
_, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "push", AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.SubscribeSync("foo", nats.Durable("push"), nats.BindConsumer())
if err != nil {
t.Fatal(err)
}

// Pull consumer
_, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "pull", AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.PullSubscribe("foo", "pull", nats.BindConsumer())
if err != nil {
t.Fatal(err)
}
}

0 comments on commit e40d303

Please sign in to comment.