Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NextMsgWithContext does not return messages #785

Closed
vdwees opened this issue Aug 3, 2021 · 10 comments · Fixed by #794
Closed

NextMsgWithContext does not return messages #785

vdwees opened this issue Aug 3, 2021 · 10 comments · Fixed by #794
Labels
bug Confirmed reproducible bug

Comments

@vdwees
Copy link

vdwees commented Aug 3, 2021

Defect

NextMsgWithContext does not return messages, but simply hangs until the ctx is closed. However, when using Fetch(1) the same subscriber, messages are received.

Versions of nats.go and the nats-server if one was involved:

nats.go v1.11.1-0.20210623165838-4b75fc59ae30
docker run -p 4222:4222 -ti nats:2.3.3-alpine -js

OS/Container environment:

Debian-like

Steps or code to reproduce the issue:

  1. start nats server in docker
  2. compile and run:
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"strings"
	"time"

	"github.com/nats-io/nats.go"
)

func main() {
	nc, err := nats.Connect("localhost:4222")
	if err != nil {
		log.Fatalf("Encountered error: %s\n", err)
	}

	js, err := nc.JetStream()
	if err != nil {
		log.Fatalf("Encountered error: %s\n", err)
	}
	streamConfig := nats.StreamConfig{
		Name:      "ORDERS",
		Storage:   nats.FileStorage,
		Subjects:  []string{"ORDERS.>"},
		Replicas:  1,
		MaxAge:    time.Hour * 24,
		Retention: nats.WorkQueuePolicy, // to delete messages as they are acked
	}
	log.Printf("Configuring stream: \n%+v\n", streamConfig)
	_, err = js.AddStream(&streamConfig)
	// Ignore "already in use" error (seems to only be present with WorkQueuePolicy)
	// https://github.com/nats-io/nats.go/discussions/761#discussioncomment-912791
	if err != nil && !strings.Contains(err.Error(), "already in use") {
		log.Fatalf("Encountered error: %s\n", err)
	}
	consumerConfig := nats.ConsumerConfig{
		Durable:       "DURABLECONSUMER",
		AckWait:       time.Minute,
		AckPolicy:     nats.AckExplicitPolicy,
		DeliverPolicy: nats.DeliverAllPolicy,
	}
	log.Printf("Configuring consumer: \n%+v\n", consumerConfig)
	_, err = js.AddConsumer("ORDERS", &consumerConfig)
	if err != nil {
		log.Fatalf("Encountered error: %s\n", err)
	}
	sub, err := js.PullSubscribe(
		"ORDERS.*",
		"DURABLECONSUMER",
		nats.BindStream("ORDERS"), // Not sure this is required?
	)
	if err != nil {
		log.Fatalf("Encountered error: %s\n", err)
	}
	mm, err := sub.Fetch(1)
	if err != nil {
		log.Fatalf("Encountered error: %s\n", err)
	}
	fmt.Printf("Received a JetStream message: %s, data: %s\n", mm[0].Subject, string(mm[0].Data))

	ctx := context.Background()
	m, err := sub.NextMsgWithContext(ctx)
	if err != nil {
		if !errors.Is(err, context.Canceled) {
			log.Fatalf("Encountered error: %s\n", err)
		}
	}
	fmt.Printf("Received a JetStream message: %s, data: %s\n", m.Subject, string(m.Data))
}
  1. do nats pub ORDERS.test "message {{.Count}} @ {{.TimeStamp}}" --count=3

Expected result:

I would expect that message 1 and 2 would get printed and the process would terminate.

{Name:ORDERS Subjects:[ORDERS.>] Retention:WorkQueue MaxConsumers:0 MaxMsgs:0 MaxBytes:0 Discard:DiscardOld MaxAge:24h0m0s MaxMsgSize:0 Storage:File Replicas:1 NoAck:false Template: Duplicates:0s Placement:<nil> Mirror:<nil> Sources:[]}
2021/08/03 18:30:55 Configuring consumer: 
{Durable:DURABLECONSUMER DeliverSubject: DeliverPolicy:0 OptStartSeq:0 OptStartTime:<nil> AckPolicy:AckExplicit AckWait:1m0s MaxDeliver:0 FilterSubject: ReplayPolicy:0 RateLimit:0 SampleFrequency: MaxWaiting:0 MaxAckPending:0 FlowControl:false Heartbeat:0s}
Received a JetStream message: ORDERS.test, data: message 1 @ 2021-08-03T18:30:57+02:00
Received a JetStream message: ORDERS.test, data: message 2 @ 2021-08-03T18:30:57+02:00

I would expect that message 2 would get printed and the process would terminate.

Actual result:

Message 1 is printed, and the process hangs.

{Name:ORDERS Subjects:[ORDERS.>] Retention:WorkQueue MaxConsumers:0 MaxMsgs:0 MaxBytes:0 Discard:DiscardOld MaxAge:24h0m0s MaxMsgSize:0 Storage:File Replicas:1 NoAck:false Template: Duplicates:0s Placement:<nil> Mirror:<nil> Sources:[]}
2021/08/03 18:30:55 Configuring consumer: 
{Durable:DURABLECONSUMER DeliverSubject: DeliverPolicy:0 OptStartSeq:0 OptStartTime:<nil> AckPolicy:AckExplicit AckWait:1m0s MaxDeliver:0 FilterSubject: ReplayPolicy:0 RateLimit:0 SampleFrequency: MaxWaiting:0 MaxAckPending:0 FlowControl:false Heartbeat:0s}
Received a JetStream message: ORDERS.test, data: message 1 @ 2021-08-03T18:30:57+02:00
@vdwees vdwees added the bug Confirmed reproducible bug label Aug 3, 2021
@wallyqs
Copy link
Member

wallyqs commented Aug 3, 2021

Hi, right now NextMsg or NextMsgWithContext are not aliases to Fetch and only used with Push subscribers. Related PR: https://github.com/nats-io/nats.go/pull/754/files
Also it is recommended to use a cancellable context or context.WithTimeout since Background context would never yield.

@vdwees
Copy link
Author

vdwees commented Aug 3, 2021

right now NextMsg or NextMsgWithContext are [...] only used with Push subscribers

Is the nats.WorkQueuePolicy only available for Pull? I changed the PullSubscribe to SubscribeSync, and now I see this error:
nats: must use pull subscribe to bind to pull based consumer

Also it is recommended to use a cancellable context or context.WithTimeout since Background context would never yield.

Ignore the smells, I cut a bunch of stuff to make a minimal example 😝

@wallyqs
Copy link
Member

wallyqs commented Aug 3, 2021

@vdwees work queue policy is available for both, but once the consumer is created, you need to use the proper API depending on the delivery mode (push/pull). PullSubscribe + Fetch is only for pull consumers, SubscribeSync is for push consumers.

@vdwees
Copy link
Author

vdwees commented Aug 3, 2021

Thanks. One more question: Is there a way to explicitly set a consumer as push or pull? Or maybe all durable consumers are pull? Asking because even with a fresh image and the following script, I still see the "must use pull subscribe to bind to pull based consumer" error.

func main() {
	nc, err := nats.Connect("localhost:4222")
	if err != nil {
		log.Fatalf("Encountered error: %s\n", err)
	}

	js, err := nc.JetStream()
	if err != nil {
		log.Fatalf("Encountered error: %s\n", err)
	}
	streamConfig := nats.StreamConfig{
		Name:     "ORDERS",
		Subjects: []string{"ORDERS.>"},
	}
	log.Printf("Configuring stream: \n%+v\n", streamConfig)
	_, err = js.AddStream(&streamConfig)
	if err != nil {
		log.Fatalf("Encountered error: %s\n", err)
	}
	consumerConfig := nats.ConsumerConfig{
		Durable:   "DURABLECONSUMER",
		AckPolicy: nats.AckExplicitPolicy,
	}
	log.Printf("Configuring consumer: \n%+v\n", consumerConfig)
	_, err = js.AddConsumer("ORDERS", &consumerConfig)
	if err != nil {
		log.Fatalf("Encountered error: %s\n", err)
	}
	_, err = js.SubscribeSync(
		"ORDERS.*",
		nats.Bind("ORDERS", "DURABLECONSUMER"),
	)
	if err != nil {
		log.Fatalf("Encountered error: %s\n", err)
	}
}

@wallyqs
Copy link
Member

wallyqs commented Aug 3, 2021

@vdwees the presence of a delivery subject is what determines whether it would be a pull based consumer, if there is none then it is a pull consumer, otherwise is a push subscriber.

@vdwees
Copy link
Author

vdwees commented Aug 3, 2021

@wallyqs Thanks, somehow I missed that detail. I can create the subscriber now. In an example you posted in an issue: #748 (comment), you use NewInbox() instead of a simple fixed string. Is there an advantage to do it this way?

@wallyqs
Copy link
Member

wallyqs commented Aug 3, 2021

@vdwees the inbox gives you a guaranteed to be unique subject so that it does not collide with a subject that might be used for another purpose in the same account namespace, but does not have to be that way, a simple string is fine

@kozlovic
Copy link
Member

With latest #791, NextMsg on a PullSubscribe will return ErrBadSubscription because user are not allowed to call NextMsg but need to call Fetch().

@vdwees
Copy link
Author

vdwees commented Aug 13, 2021

@kozlovic It sounds like #791 closes this issue, correct?

@kozlovic
Copy link
Member

Not entirely, in that PR I do it only for NextMsgWithContent. In upcoming PR, it will indeed cause NextMsg() to return ErrTypeSubscription (btw, this is ErrTypeSubscription, not ErrBadSubscription as I wrote earlier). So this will close when another PR that I will submit later lands.

kozlovic added a commit that referenced this issue Aug 15, 2021
They will be described in the release notes, but gist:

Added:
- `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants)
- `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation)
- Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers
- Field `Last` in `SequencePair`

Changed:
- With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API
- If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes
- Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5
- Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error

Fixed:
- Possible lock inversion
- JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()`

Resolves #785
Resolves #776
Resolves #775
Resolves #748
Resolves #747

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
kozlovic added a commit that referenced this issue Aug 15, 2021
They will be described in the release notes, but gist:

Added:
- `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants)
- `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation)
- Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers
- Field `Last` in `SequencePair`

Changed:
- With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API
- If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes
- Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5
- Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error

Fixed:
- Possible lock inversion
- JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()`

Resolves #785
Resolves #776
Resolves #775
Resolves #748
Resolves #747

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
kozlovic added a commit that referenced this issue Aug 15, 2021
They will be described in the release notes, but gist:

Added:
- `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants)
- `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation)
- Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers
- Field `Last` in `SequencePair`

Changed:
- With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API
- If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes
- Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5
- Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error

Fixed:
- Possible lock inversion
- JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()`

Resolves #785
Resolves #776
Resolves #775
Resolves #748
Resolves #747

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Confirmed reproducible bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants