diff --git a/js.go b/js.go index e6d64f3c1..f4b532b0b 100644 --- a/js.go +++ b/js.go @@ -118,6 +118,9 @@ type JetStream interface { // ChanSubscribe creates channel based Subscription. ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) + // ChanQueueSubscribe creates channel based Subscription with a queue group. + ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) + // QueueSubscribe creates a Subscription with a queue group. QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) @@ -968,11 +971,16 @@ func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscript return js.subscribe(subj, queue, nil, mch, true, opts) } -// Subscribe will create a subscription to the appropriate stream and consumer. +// ChanSubscribe will create a subscription to the appropriate stream and consumer using a channel. func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, _EMPTY_, nil, ch, false, opts) } +// ChanQueueSubscribe will create a subscription to the appropriate stream and consumer using a channel. +func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { + return js.subscribe(subj, queue, nil, ch, false, opts) +} + // PullSubscribe creates a pull subscriber. func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, _EMPTY_, nil, nil, false, append(opts, Durable(durable))) diff --git a/test/js_test.go b/test/js_test.go index 88b6bb16f..87b2b3226 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -648,6 +648,56 @@ func TestJetStreamSubscribe(t *testing.T) { if meta.Consumer != "consumer-name" { t.Fatalf("Unexpected consumer name, got: %v", meta.Consumer) } + + qsubDurable = nats.Durable(dname + "-qsub-chan") + mch := make(chan *nats.Msg, 16536) + sub, err = js.ChanQueueSubscribe("bar", "v1", mch, qsubDurable) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + var a, b *nats.MsgMetadata + select { + case msg := <-mch: + meta, err := msg.Metadata() + if err != nil { + t.Error(err) + } + a = meta + case <-time.After(2 * time.Second): + t.Errorf("Timeout waiting for message") + } + + mch2 := make(chan *nats.Msg, 16536) + sub, err = js.ChanQueueSubscribe("bar", "v1", mch2, qsubDurable) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + + // Publish more messages so that at least one is received by + // the channel queue subscriber. + for i := 0; i < toSend; i++ { + js.Publish("bar", msg) + } + + select { + case msg := <-mch2: + meta, err := msg.Metadata() + if err != nil { + t.Error(err) + } + b = meta + case <-time.After(2 * time.Second): + t.Errorf("Timeout waiting for message") + } + if reflect.DeepEqual(a, b) { + t.Errorf("Expected to receive different messages in stream") + } + + // Both ChanQueueSubscribers use the same consumer. + expectConsumers(t, 8) } func TestJetStreamAckPending_Pull(t *testing.T) {