Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MaxRequestMaxBytes consumer option #1043

Merged
merged 2 commits into from Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 29 additions & 7 deletions js.go
Expand Up @@ -1037,8 +1037,9 @@ type ConsumerConfig struct {
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
MaxRequestMaxBytes int `json:"max_bytes,omitempty"`

// Push based consumers.
DeliverSubject string `json:"deliver_subject,omitempty"`
Expand Down Expand Up @@ -1084,9 +1085,10 @@ type SequencePair struct {

// nextRequest is for getting next messages for pull based consumers from JetStream.
type nextRequest struct {
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
}

// jsSub includes JetStream subscription info.
Expand Down Expand Up @@ -2445,6 +2447,15 @@ func MaxRequestExpires(max time.Duration) SubOpt {
})
}

// MaxRequesMaxBytes sets the maximum pull consumer request bytes that a
// Fetch() can receive.
func MaxRequestMaxBytes(bytes int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxRequestMaxBytes = bytes
return nil
})
}

// InactiveThreshold indicates how long the server should keep an ephemeral
// after detecting loss of interest.
func InactiveThreshold(threshold time.Duration) SubOpt {
Expand Down Expand Up @@ -2485,8 +2496,9 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
}

type pullOpts struct {
ttl time.Duration
ctx context.Context
maxBytes int
ttl time.Duration
ctx context.Context
}

// PullOpt are the options that can be passed when pulling a batch of messages.
Expand All @@ -2502,6 +2514,14 @@ func PullMaxWaiting(n int) SubOpt {
})
}

// PullMaxBytes defines the max bytes allowed for a fetch request.
type PullMaxBytes int

func (n PullMaxBytes) configurePull(opts *pullOpts) error {
opts.maxBytes = int(n)
return nil
}

var (
// errNoMessages is an error that a Fetch request using no_wait can receive to signal
// that there are no more messages available.
Expand Down Expand Up @@ -2677,6 +2697,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// For batch real size of 1, it does not make sense to set no_wait in
// the request.
noWait := batch-len(msgs) > 1

var nr nextRequest

sendReq := func() error {
Expand All @@ -2701,6 +2722,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
nr.Batch = batch - len(msgs)
nr.Expires = expires
nr.NoWait = noWait
nr.MaxBytes = o.maxBytes
req, _ := json.Marshal(nr)
return nc.PublishRequest(nms, rply, req)
}
Expand Down
16 changes: 16 additions & 0 deletions test/js_test.go
Expand Up @@ -1411,6 +1411,8 @@ func TestJetStreamManagement(t *testing.T) {
expected.HeadersOnly = true
expected.MaxRequestBatch = 10
expected.MaxRequestExpires = 2 * time.Second
expected.MaxRequestMaxBytes = 1024

ci, err = js.UpdateConsumer("foo", &expected)
if err != nil {
t.Fatalf("Error on update: %v", err)
Expand Down Expand Up @@ -5870,6 +5872,20 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) {
}
})

t.Run("max request max bytes", func(t *testing.T) {
defer js.PurgeStream(subject)

sub, err := js.PullSubscribe(subject, "max-request-max-bytes", nats.MaxRequestMaxBytes(100))
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()

if _, err := sub.Fetch(10, nats.PullMaxBytes(200)); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") {
t.Fatalf("Expected error about max request max bytes, got %v", err)
}
})

t.Run("max request expires", func(t *testing.T) {
defer js.PurgeStream(subject)

Expand Down