Skip to content

Commit

Permalink
Add MaxRequestMaxBytes consumer option
Browse files Browse the repository at this point in the history
Signed-off-by: Byron Ruth <b@devel.io>
  • Loading branch information
bruth committed Jul 28, 2022
1 parent f4a86f3 commit f0f7b36
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
14 changes: 12 additions & 2 deletions js.go
Expand Up @@ -1001,8 +1001,9 @@ type ConsumerConfig struct {
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
MaxRequestMaxBytes int `json:"max_bytes,omitempty"`

// Push based consumers.
DeliverSubject string `json:"deliver_subject,omitempty"`
Expand Down Expand Up @@ -2401,6 +2402,15 @@ func MaxRequestExpires(max time.Duration) SubOpt {
})
}

// MaxRequesMaxBytes sets the maximum pull consumer request bytes that a
// Fetch() can receive.
func MaxRequestMaxBytes(bytes int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxRequestMaxBytes = bytes
return nil
})
}

// InactiveThreshold indicates how long the server should keep an ephemeral
// after detecting loss of interest.
func InactiveThreshold(threshold time.Duration) SubOpt {
Expand Down
15 changes: 15 additions & 0 deletions test/js_test.go
Expand Up @@ -1412,6 +1412,8 @@ func TestJetStreamManagement(t *testing.T) {
expected.HeadersOnly = true
expected.MaxRequestBatch = 10
expected.MaxRequestExpires = 2 * time.Second
expected.MaxRequestMaxBytes = 1024

ci, err = js.UpdateConsumer("foo", &expected)
if err != nil {
t.Fatalf("Error on update: %v", err)
Expand Down Expand Up @@ -5602,6 +5604,19 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) {
}
})

t.Run("max request max bytes", func(t *testing.T) {
defer js.PurgeStream(subject)

sub, err := js.PullSubscribe(subject, "max-request-max-bytes", nats.MaxRequestMaxBytes(100))
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()
if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") {
t.Fatalf("Expected error about max request max bytes, got %v", err)
}
})

t.Run("max request expires", func(t *testing.T) {
defer js.PurgeStream(subject)

Expand Down

0 comments on commit f0f7b36

Please sign in to comment.