diff --git a/js.go b/js.go index 23f10b1f3..b84d79343 100644 --- a/js.go +++ b/js.go @@ -1085,9 +1085,10 @@ type SequencePair struct { // nextRequest is for getting next messages for pull based consumers from JetStream. type nextRequest struct { - Expires time.Duration `json:"expires,omitempty"` - Batch int `json:"batch,omitempty"` - NoWait bool `json:"no_wait,omitempty"` + Expires time.Duration `json:"expires,omitempty"` + Batch int `json:"batch,omitempty"` + NoWait bool `json:"no_wait,omitempty"` + MaxBytes int `json:"max_bytes,omitempty"` } // jsSub includes JetStream subscription info. @@ -2495,8 +2496,9 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { } type pullOpts struct { - ttl time.Duration - ctx context.Context + maxBytes int + ttl time.Duration + ctx context.Context } // PullOpt are the options that can be passed when pulling a batch of messages. @@ -2512,6 +2514,14 @@ func PullMaxWaiting(n int) SubOpt { }) } +// PullMaxBytes defines the max bytes allowed for a fetch request. +type PullMaxBytes int + +func (n PullMaxBytes) configurePull(opts *pullOpts) error { + opts.maxBytes = int(n) + return nil +} + var ( // errNoMessages is an error that a Fetch request using no_wait can receive to signal // that there are no more messages available. @@ -2687,6 +2697,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { // For batch real size of 1, it does not make sense to set no_wait in // the request. noWait := batch-len(msgs) > 1 + var nr nextRequest sendReq := func() error { @@ -2711,6 +2722,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { nr.Batch = batch - len(msgs) nr.Expires = expires nr.NoWait = noWait + nr.MaxBytes = o.maxBytes req, _ := json.Marshal(nr) return nc.PublishRequest(nms, rply, req) } diff --git a/test/js_test.go b/test/js_test.go index 0b3e4e321..43d9bcffa 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -5880,7 +5880,8 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) { t.Fatal(err) } defer sub.Unsubscribe() - if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") { + + if _, err := sub.Fetch(10, nats.PullMaxBytes(200)); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") { t.Fatalf("Expected error about max request max bytes, got %v", err) } })