From f0f7b3655e7ecd9d63a211bc10a6e1cb6f91e2c3 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Wed, 27 Jul 2022 21:18:39 -0400 Subject: [PATCH] Add MaxRequestMaxBytes consumer option Signed-off-by: Byron Ruth --- js.go | 14 ++++++++++++-- test/js_test.go | 15 +++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/js.go b/js.go index d9df981d8..198590cc5 100644 --- a/js.go +++ b/js.go @@ -1001,8 +1001,9 @@ type ConsumerConfig struct { HeadersOnly bool `json:"headers_only,omitempty"` // Pull based options. - MaxRequestBatch int `json:"max_batch,omitempty"` - MaxRequestExpires time.Duration `json:"max_expires,omitempty"` + MaxRequestBatch int `json:"max_batch,omitempty"` + MaxRequestExpires time.Duration `json:"max_expires,omitempty"` + MaxRequestMaxBytes int `json:"max_bytes,omitempty"` // Push based consumers. DeliverSubject string `json:"deliver_subject,omitempty"` @@ -2401,6 +2402,15 @@ func MaxRequestExpires(max time.Duration) SubOpt { }) } +// MaxRequesMaxBytes sets the maximum pull consumer request bytes that a +// Fetch() can receive. +func MaxRequestMaxBytes(bytes int) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxRequestMaxBytes = bytes + return nil + }) +} + // InactiveThreshold indicates how long the server should keep an ephemeral // after detecting loss of interest. func InactiveThreshold(threshold time.Duration) SubOpt { diff --git a/test/js_test.go b/test/js_test.go index de2efbf1a..04de7fb78 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1412,6 +1412,8 @@ func TestJetStreamManagement(t *testing.T) { expected.HeadersOnly = true expected.MaxRequestBatch = 10 expected.MaxRequestExpires = 2 * time.Second + expected.MaxRequestMaxBytes = 1024 + ci, err = js.UpdateConsumer("foo", &expected) if err != nil { t.Fatalf("Error on update: %v", err) @@ -5602,6 +5604,19 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) { } }) + t.Run("max request max bytes", func(t *testing.T) { + defer js.PurgeStream(subject) + + sub, err := js.PullSubscribe(subject, "max-request-max-bytes", nats.MaxRequestMaxBytes(100)) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") { + t.Fatalf("Expected error about max request max bytes, got %v", err) + } + }) + t.Run("max request expires", func(t *testing.T) { defer js.PurgeStream(subject)