Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream: the value for max pending async messages can be exceeded #1612

Open
ygabuev opened this issue Apr 16, 2024 · 0 comments
Open

JetStream: the value for max pending async messages can be exceeded #1612

ygabuev opened this issue Apr 16, 2024 · 0 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@ygabuev
Copy link

ygabuev commented Apr 16, 2024

Observed behavior

When using the simplified JetStream client in Go, it is possible to exceed the limit of max pending async messages that is specified by the WithPublishAsyncMaxPending option.

Expected behavior

The limit specified by WithPublishAsyncMaxPending should not be exceeded in practice.

Server and client version

NATS server: 2.10.12
NATS Go client: 1.34.1

Host environment

No response

Steps to reproduce

The following code

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
)

const (
	Subject = "foo.bar.baz"
	Stream  = "FOO"
)

func main() {
	ctx := context.Background()
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		panic(err)
	}

	js, err := jetstream.New(
		nc,
		jetstream.WithPublishAsyncMaxPending(1),
	)
	if err != nil {
		panic(err)
	}

	_, _ = js.CreateStream(ctx, jetstream.StreamConfig{
		Name:     Stream,
		Subjects: []string{Subject},
	})
	defer js.DeleteStream(ctx, Stream)

	go func() {
		for {
			fmt.Printf("currently pending: %d\n", js.PublishAsyncPending())
			time.Sleep(50 * time.Millisecond)
		}
	}()

	for i := 0; i < 5; i++ {
		go func(i int) {
			_, err := js.PublishAsync(Subject, []byte{})
			if err != nil {
				fmt.Printf("%d  %v\n", i, err)
			}
		}(i)
	}

	time.Sleep(200 * time.Millisecond)
}

generates the following output for me:

currently pending: 0
currently pending: 4
currently pending: 4
currently pending: 4
2  nats: stalled with too many outstanding async published messages
3  nats: stalled with too many outstanding async published messages
0  nats: stalled with too many outstanding async published messages
1  nats: stalled with too many outstanding async published messages
currently pending: 0
@ygabuev ygabuev added the defect Suspected defect such as a bug or regression label Apr 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

1 participant