Skip to content

Commit

Permalink
js: Add ChanQueueSubscribe for JS
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Jun 9, 2021
1 parent ba098b9 commit c97de4e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
10 changes: 9 additions & 1 deletion js.go
Expand Up @@ -118,6 +118,9 @@ type JetStream interface {
// ChanSubscribe creates channel based Subscription.
ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// QueueChanSubscribe creates channel based Subscription with a queue group.
ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// QueueSubscribe creates a Subscription with a queue group.
QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

Expand Down Expand Up @@ -968,11 +971,16 @@ func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscript
return js.subscribe(subj, queue, nil, mch, true, opts)
}

// Subscribe will create a subscription to the appropriate stream and consumer.
// ChanSubscribe will create a subscription to the appropriate stream and consumer using a channel.
func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, _EMPTY_, nil, ch, false, opts)
}

// ChanQueueSubscribe will create a subscription to the appropriate stream and consumer using a channel.
func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, queue, nil, ch, false, opts)
}

// PullSubscribe creates a pull subscriber.
func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, _EMPTY_, nil, nil, false, append(opts, Durable(durable)))
Expand Down
44 changes: 44 additions & 0 deletions test/js_test.go
Expand Up @@ -497,6 +497,50 @@ func TestJetStreamSubscribe(t *testing.T) {
waitForPending(t, 10)
expectConsumers(t, 4)

qsubDurable = nats.Durable(dname + "-qsub-chan")
mch := make(chan *nats.Msg, 16536)
sub, err = js.ChanQueueSubscribe("bar", "v1", mch, qsubDurable)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

var a, b *nats.MsgMetadata
select {
case msg := <-mch:
meta, err := msg.Metadata()
if err != nil {
t.Error(err)
}
a = meta
case <-time.After(2 * time.Second):
t.Errorf("Timeout waiting for message")
}

mch2 := make(chan *nats.Msg, 16536)
sub, err = js.ChanQueueSubscribe("bar", "v1", mch2, qsubDurable)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

select {
case msg := <-mch:
meta, err := msg.Metadata()
if err != nil {
t.Error(err)
}
b = meta
case <-time.After(2 * time.Second):
t.Errorf("Timeout waiting for message")
}
if reflect.DeepEqual(a, b) {
t.Errorf("Expected to receive different messages in stream")
}

// Both ChanQueueSubscribers use the same consumer.
expectConsumers(t, 5)

// Now try pull based subscribers.

// Check some error conditions first.
Expand Down

0 comments on commit c97de4e

Please sign in to comment.