Skip to content

Commit

Permalink
Merge pull request #744 from nats-io/js-queue-chan
Browse files Browse the repository at this point in the history
js: Add ChanQueueSubscribe for JS
  • Loading branch information
wallyqs committed Jun 9, 2021
2 parents ba098b9 + 98dc666 commit e4b051a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
10 changes: 9 additions & 1 deletion js.go
Expand Up @@ -118,6 +118,9 @@ type JetStream interface {
// ChanSubscribe creates channel based Subscription.
ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// ChanQueueSubscribe creates channel based Subscription with a queue group.
ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

// QueueSubscribe creates a Subscription with a queue group.
QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

Expand Down Expand Up @@ -968,11 +971,16 @@ func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscript
return js.subscribe(subj, queue, nil, mch, true, opts)
}

// Subscribe will create a subscription to the appropriate stream and consumer.
// ChanSubscribe will create a subscription to the appropriate stream and consumer using a channel.
func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, _EMPTY_, nil, ch, false, opts)
}

// ChanQueueSubscribe will create a subscription to the appropriate stream and consumer using a channel.
func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, queue, nil, ch, false, opts)
}

// PullSubscribe creates a pull subscriber.
func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, _EMPTY_, nil, nil, false, append(opts, Durable(durable)))
Expand Down
50 changes: 50 additions & 0 deletions test/js_test.go
Expand Up @@ -648,6 +648,56 @@ func TestJetStreamSubscribe(t *testing.T) {
if meta.Consumer != "consumer-name" {
t.Fatalf("Unexpected consumer name, got: %v", meta.Consumer)
}

qsubDurable = nats.Durable(dname + "-qsub-chan")
mch := make(chan *nats.Msg, 16536)
sub, err = js.ChanQueueSubscribe("bar", "v1", mch, qsubDurable)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

var a, b *nats.MsgMetadata
select {
case msg := <-mch:
meta, err := msg.Metadata()
if err != nil {
t.Error(err)
}
a = meta
case <-time.After(2 * time.Second):
t.Errorf("Timeout waiting for message")
}

mch2 := make(chan *nats.Msg, 16536)
sub, err = js.ChanQueueSubscribe("bar", "v1", mch2, qsubDurable)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()

// Publish more messages so that at least one is received by
// the channel queue subscriber.
for i := 0; i < toSend; i++ {
js.Publish("bar", msg)
}

select {
case msg := <-mch2:
meta, err := msg.Metadata()
if err != nil {
t.Error(err)
}
b = meta
case <-time.After(2 * time.Second):
t.Errorf("Timeout waiting for message")
}
if reflect.DeepEqual(a, b) {
t.Errorf("Expected to receive different messages in stream")
}

// Both ChanQueueSubscribers use the same consumer.
expectConsumers(t, 8)
}

func TestJetStreamAckPending_Pull(t *testing.T) {
Expand Down

0 comments on commit e4b051a

Please sign in to comment.