Skip to content

Commit

Permalink
js: Add BindConsumer option
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Jun 3, 2021
1 parent da90d22 commit 274e7ea
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 28 deletions.
67 changes: 46 additions & 21 deletions js.go
Expand Up @@ -1008,7 +1008,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}
}

isPullMode := ch == nil && cb == nil
isPullMode := ch == nil && cb == nil && !isSync
badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
hasHeartbeats := o.cfg.Heartbeat > 0
hasFC := o.cfg.FlowControl
Expand All @@ -1017,17 +1017,24 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}

var (
err error
shouldCreate bool
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
err error
shouldCreate bool
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
notFoundErr bool
stream = o.stream
consumer = o.cfg.Durable
isDurable = o.cfg.Durable != _EMPTY_
consumerBound = o.cbound
)

// In case of binding to a consumer, then a consumer name is required.
if consumer == _EMPTY_ && consumerBound {
return nil, ErrDurableNameRequired
}

// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
stream, err = js.lookupStreamBySubject(subj)
Expand All @@ -1038,18 +1045,15 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
stream = o.stream
}

// With an explicit durable name, then can lookup
// the consumer to which it should be attaching to.
consumer = o.cfg.Durable
// With an explicit durable name, then can lookup the consumer first
// to which it should be attaching to.
if consumer != _EMPTY_ {
// Only create in case there is no consumer already.
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != "nats: consumer not found" {
return nil, err
}
notFoundErr = err != nil && strings.Contains(err.Error(), "consumer not found")
}

if info != nil {
switch {
case info != nil:
// Attach using the found consumer config.
ccfg = &info.Config
attached = true
Expand All @@ -1059,12 +1063,23 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
return nil, ErrSubjectMismatch
}

// Prevent binding a subscription against incompatible consumer types.
if isPullMode && ccfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: cannot pull subscribe to push based consumer")
} else if !isPullMode && ccfg.DeliverSubject == _EMPTY_ {
return nil, fmt.Errorf("nats: must use pull subscribe to bind to pull based consumer")
}

if ccfg.DeliverSubject != _EMPTY_ {
deliver = ccfg.DeliverSubject
} else {
} else if !isPullMode {
deliver = NewInbox()
}
} else {
case err != nil && !notFoundErr || notFoundErr && consumerBound:
// Return error if consumer lookup by name fails and the consumer is being bound.
return nil, err
default:
// Attempt to create consumer if not found or using BindOnly.
shouldCreate = true
deliver = NewInbox()
if !isPullMode {
Expand Down Expand Up @@ -1346,11 +1361,13 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) {

type subOpts struct {
// For attaching.
stream, consumer string
stream string
// For manual ack
mack bool
// For creating or updating.
cfg *ConsumerConfig
// For binding a subscription to a consumer without creating it.
cbound bool
}

// ManualAck disables auto ack functionality for async subscriptions.
Expand Down Expand Up @@ -1489,6 +1506,14 @@ func BindStream(name string) SubOpt {
})
}

// BindOnly binds a subscription to a consumer without attempting to create.
func BindOnly() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cbound = true
return nil
})
}

// EnableFlowControl enables flow control for a push based consumer.
func EnableFlowControl() SubOpt {
return subOptFn(func(opts *subOpts) error {
Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -149,6 +149,7 @@ var (
ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required")
ErrDurableNameRequired = errors.New("nats: durable name of consumer is required")
)

func init() {
Expand Down
133 changes: 126 additions & 7 deletions test/js_test.go
Expand Up @@ -2215,22 +2215,30 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
# For the stream publish.
{ service: "ORDERS" }
# For the pull based consumer. Response type needed for batchsize > 1
{ service: "$JS.API.CONSUMER.INFO.ORDERS.d1", response: stream }
{ service: "$JS.API.CONSUMER.MSG.NEXT.ORDERS.d1", response: stream }
# For the push based consumer delivery and ack.
{ stream: "p.d" }
{ stream: "p.d3" }
# For the acks. Service in case we want an ack to our ack.
{ service: "$JS.ACK.ORDERS.*.>" }
# Allow lookup of stream to be able to bind from another account.
{ service: "$JS.API.CONSUMER.INFO.ORDERS.d4", response: stream }
{ stream: "p.d4" }
]
},
U: {
users: [ {user: rip, password: bar} ]
imports [
{ service: { subject: "$JS.API.INFO", account: JS } }
{ service: { subject: "ORDERS", account: JS } , to: "orders" }
{ service: { subject: "$JS.API.CONSUMER.INFO.ORDERS.d1", account: JS } }
{ service: { subject: "$JS.API.CONSUMER.INFO.ORDERS.d4", account: JS } }
{ service: { subject: "$JS.API.CONSUMER.MSG.NEXT.ORDERS.d1", account: JS } }
{ stream: { subject: "p.d", account: JS } }
{ stream: { subject: "p.d3", account: JS } }
{ stream: { subject: "p.d4", account: JS } }
{ service: { subject: "$JS.ACK.ORDERS.*.>", account: JS } }
]
},
Expand Down Expand Up @@ -2288,6 +2296,15 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
t.Fatalf("push consumer create failed: %v", err)
}

_, err = jsm.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "d4",
AckPolicy: nats.AckExplicitPolicy,
DeliverSubject: "p.d4",
})
if err != nil {
t.Fatalf("push consumer create failed: %v", err)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down Expand Up @@ -2343,15 +2360,31 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
}
}

// Cannot subscribe with JS context from another account right now.
if _, err := js.SubscribeSync("ORDERS"); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err)
// Cannot create ephemeral subscriber with JS context.
if _, err := js.SubscribeSync("ORDERS", nats.BindOnly()); err != nats.ErrDurableNameRequired {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrDurableNameRequired, err)
}
if _, err = js.SubscribeSync("ORDERS", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err)

// Can attach to the consumer from another JS account if there is a durable name.
sub, err = js.SubscribeSync("ORDERS", nats.Durable("d4"), nats.BindStream("ORDERS"), nats.BindOnly())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS")); err != nats.ErrJetStreamNotEnabled {
t.Fatalf("Expected an error of '%v', got '%v'", nats.ErrJetStreamNotEnabled, err)
waitForPending(t, toSend)

// If there are permissions to check that a consumer exists, then possible to bind subscription to it.
sub, err = js.PullSubscribe("ORDERS", "d1", nats.BindStream("ORDERS"))
if err != nil {
t.Fatal(err)
}
expected := 10
msgs, err := sub.Fetch(expected)
if err != nil {
t.Fatal(err)
}
got := len(msgs)
if got != expected {
t.Fatalf("Expected %d, got %d", expected, got)
}
}

Expand Down Expand Up @@ -5899,3 +5932,89 @@ func TestJetStreamPublishAsyncPerf(t *testing.T) {
fmt.Printf("Took %v to send %d msgs\n", tt, toSend)
fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds())
}

func TestJetStreamBindConsumer(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if _, err := js.AddStream(nil); err == nil {
t.Fatalf("Unexpected success")
}
si, err := js.AddStream(&nats.StreamConfig{Name: "foo"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", si)
}

for i := 0; i < 25; i++ {
js.Publish("foo", []byte("hi"))
}

// Durable name required to bind a consumer.
_, err = js.SubscribeSync("foo", nats.BindOnly())
if err != nats.ErrDurableNameRequired {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.SubscribeSync("foo", nats.Durable("push"), nats.BindOnly())
if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.PullSubscribe("foo", "pull", nats.BindOnly())
if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") {
t.Fatalf("Unexpected error: %v", err)
}

// Push consumer
_, err = js.AddConsumer("foo", &nats.ConsumerConfig{
Durable: "push",
AckPolicy: nats.AckExplicitPolicy,
DeliverSubject: nats.NewInbox(),
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.SubscribeSync("foo", nats.Durable("push"), nats.BindOnly())
if err != nil {
t.Fatal(err)
}

// Pull consumer
_, err = js.AddConsumer("foo", &nats.ConsumerConfig{
Durable: "pull",
AckPolicy: nats.AckExplicitPolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.PullSubscribe("foo", "pull", nats.BindOnly())
if err != nil {
t.Fatal(err)
}

// Prevent binding to durable that is from a wrong type.
_, err = js.PullSubscribe("foo", "push", nats.BindOnly())
if err == nil || !strings.Contains(err.Error(), `cannot pull subscribe to push based consumer`) {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.SubscribeSync("foo", nats.Durable("pull"), nats.BindOnly())
if err == nil || !strings.Contains(err.Error(), `must use pull subscribe to bind to pull based consumer`) {
t.Fatalf("Unexpected error: %v", err)
}
}

0 comments on commit 274e7ea

Please sign in to comment.