diff --git a/js.go b/js.go index 77265854b..b84d79343 100644 --- a/js.go +++ b/js.go @@ -1037,8 +1037,9 @@ type ConsumerConfig struct { HeadersOnly bool `json:"headers_only,omitempty"` // Pull based options. - MaxRequestBatch int `json:"max_batch,omitempty"` - MaxRequestExpires time.Duration `json:"max_expires,omitempty"` + MaxRequestBatch int `json:"max_batch,omitempty"` + MaxRequestExpires time.Duration `json:"max_expires,omitempty"` + MaxRequestMaxBytes int `json:"max_bytes,omitempty"` // Push based consumers. DeliverSubject string `json:"deliver_subject,omitempty"` @@ -1084,9 +1085,10 @@ type SequencePair struct { // nextRequest is for getting next messages for pull based consumers from JetStream. type nextRequest struct { - Expires time.Duration `json:"expires,omitempty"` - Batch int `json:"batch,omitempty"` - NoWait bool `json:"no_wait,omitempty"` + Expires time.Duration `json:"expires,omitempty"` + Batch int `json:"batch,omitempty"` + NoWait bool `json:"no_wait,omitempty"` + MaxBytes int `json:"max_bytes,omitempty"` } // jsSub includes JetStream subscription info. @@ -2445,6 +2447,15 @@ func MaxRequestExpires(max time.Duration) SubOpt { }) } +// MaxRequesMaxBytes sets the maximum pull consumer request bytes that a +// Fetch() can receive. +func MaxRequestMaxBytes(bytes int) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxRequestMaxBytes = bytes + return nil + }) +} + // InactiveThreshold indicates how long the server should keep an ephemeral // after detecting loss of interest. func InactiveThreshold(threshold time.Duration) SubOpt { @@ -2485,8 +2496,9 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { } type pullOpts struct { - ttl time.Duration - ctx context.Context + maxBytes int + ttl time.Duration + ctx context.Context } // PullOpt are the options that can be passed when pulling a batch of messages. @@ -2502,6 +2514,14 @@ func PullMaxWaiting(n int) SubOpt { }) } +// PullMaxBytes defines the max bytes allowed for a fetch request. +type PullMaxBytes int + +func (n PullMaxBytes) configurePull(opts *pullOpts) error { + opts.maxBytes = int(n) + return nil +} + var ( // errNoMessages is an error that a Fetch request using no_wait can receive to signal // that there are no more messages available. @@ -2677,6 +2697,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { // For batch real size of 1, it does not make sense to set no_wait in // the request. noWait := batch-len(msgs) > 1 + var nr nextRequest sendReq := func() error { @@ -2701,6 +2722,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { nr.Batch = batch - len(msgs) nr.Expires = expires nr.NoWait = noWait + nr.MaxBytes = o.maxBytes req, _ := json.Marshal(nr) return nc.PublishRequest(nms, rply, req) } diff --git a/test/js_test.go b/test/js_test.go index f644d64a3..43d9bcffa 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1411,6 +1411,8 @@ func TestJetStreamManagement(t *testing.T) { expected.HeadersOnly = true expected.MaxRequestBatch = 10 expected.MaxRequestExpires = 2 * time.Second + expected.MaxRequestMaxBytes = 1024 + ci, err = js.UpdateConsumer("foo", &expected) if err != nil { t.Fatalf("Error on update: %v", err) @@ -5870,6 +5872,20 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) { } }) + t.Run("max request max bytes", func(t *testing.T) { + defer js.PurgeStream(subject) + + sub, err := js.PullSubscribe(subject, "max-request-max-bytes", nats.MaxRequestMaxBytes(100)) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + if _, err := sub.Fetch(10, nats.PullMaxBytes(200)); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") { + t.Fatalf("Expected error about max request max bytes, got %v", err) + } + }) + t.Run("max request expires", func(t *testing.T) { defer js.PurgeStream(subject)