From 6904b154ffbc8843b31cd9ac3dec5f17e5f5067f Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Wed, 27 Jul 2022 21:18:39 -0400 Subject: [PATCH 1/2] Add MaxRequestMaxBytes consumer option Signed-off-by: Byron Ruth --- js.go | 14 ++++++++++++-- test/js_test.go | 15 +++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/js.go b/js.go index 77265854b..23f10b1f3 100644 --- a/js.go +++ b/js.go @@ -1037,8 +1037,9 @@ type ConsumerConfig struct { HeadersOnly bool `json:"headers_only,omitempty"` // Pull based options. - MaxRequestBatch int `json:"max_batch,omitempty"` - MaxRequestExpires time.Duration `json:"max_expires,omitempty"` + MaxRequestBatch int `json:"max_batch,omitempty"` + MaxRequestExpires time.Duration `json:"max_expires,omitempty"` + MaxRequestMaxBytes int `json:"max_bytes,omitempty"` // Push based consumers. DeliverSubject string `json:"deliver_subject,omitempty"` @@ -2445,6 +2446,15 @@ func MaxRequestExpires(max time.Duration) SubOpt { }) } +// MaxRequesMaxBytes sets the maximum pull consumer request bytes that a +// Fetch() can receive. +func MaxRequestMaxBytes(bytes int) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxRequestMaxBytes = bytes + return nil + }) +} + // InactiveThreshold indicates how long the server should keep an ephemeral // after detecting loss of interest. func InactiveThreshold(threshold time.Duration) SubOpt { diff --git a/test/js_test.go b/test/js_test.go index f644d64a3..0b3e4e321 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1411,6 +1411,8 @@ func TestJetStreamManagement(t *testing.T) { expected.HeadersOnly = true expected.MaxRequestBatch = 10 expected.MaxRequestExpires = 2 * time.Second + expected.MaxRequestMaxBytes = 1024 + ci, err = js.UpdateConsumer("foo", &expected) if err != nil { t.Fatalf("Error on update: %v", err) @@ -5870,6 +5872,19 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) { } }) + t.Run("max request max bytes", func(t *testing.T) { + defer js.PurgeStream(subject) + + sub, err := js.PullSubscribe(subject, "max-request-max-bytes", nats.MaxRequestMaxBytes(100)) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") { + t.Fatalf("Expected error about max request max bytes, got %v", err) + } + }) + t.Run("max request expires", func(t *testing.T) { defer js.PurgeStream(subject) From 2de06e01e7d3d6df763c1e24aff2c8e7b873e1d7 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Mon, 15 Aug 2022 08:53:04 -0400 Subject: [PATCH 2/2] Add PullMaxBytes fetch option Signed-off-by: Byron Ruth --- js.go | 22 +++++++++++++++++----- test/js_test.go | 3 ++- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/js.go b/js.go index 23f10b1f3..b84d79343 100644 --- a/js.go +++ b/js.go @@ -1085,9 +1085,10 @@ type SequencePair struct { // nextRequest is for getting next messages for pull based consumers from JetStream. type nextRequest struct { - Expires time.Duration `json:"expires,omitempty"` - Batch int `json:"batch,omitempty"` - NoWait bool `json:"no_wait,omitempty"` + Expires time.Duration `json:"expires,omitempty"` + Batch int `json:"batch,omitempty"` + NoWait bool `json:"no_wait,omitempty"` + MaxBytes int `json:"max_bytes,omitempty"` } // jsSub includes JetStream subscription info. @@ -2495,8 +2496,9 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { } type pullOpts struct { - ttl time.Duration - ctx context.Context + maxBytes int + ttl time.Duration + ctx context.Context } // PullOpt are the options that can be passed when pulling a batch of messages. @@ -2512,6 +2514,14 @@ func PullMaxWaiting(n int) SubOpt { }) } +// PullMaxBytes defines the max bytes allowed for a fetch request. +type PullMaxBytes int + +func (n PullMaxBytes) configurePull(opts *pullOpts) error { + opts.maxBytes = int(n) + return nil +} + var ( // errNoMessages is an error that a Fetch request using no_wait can receive to signal // that there are no more messages available. @@ -2687,6 +2697,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { // For batch real size of 1, it does not make sense to set no_wait in // the request. noWait := batch-len(msgs) > 1 + var nr nextRequest sendReq := func() error { @@ -2711,6 +2722,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { nr.Batch = batch - len(msgs) nr.Expires = expires nr.NoWait = noWait + nr.MaxBytes = o.maxBytes req, _ := json.Marshal(nr) return nc.PublishRequest(nms, rply, req) } diff --git a/test/js_test.go b/test/js_test.go index 0b3e4e321..43d9bcffa 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -5880,7 +5880,8 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) { t.Fatal(err) } defer sub.Unsubscribe() - if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") { + + if _, err := sub.Fetch(10, nats.PullMaxBytes(200)); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") { t.Fatalf("Expected error about max request max bytes, got %v", err) } })