Skip to content

Commit

Permalink
Add PullMaxBytes fetch option
Browse files Browse the repository at this point in the history
Signed-off-by: Byron Ruth <b@devel.io>
  • Loading branch information
bruth committed Aug 15, 2022
1 parent 6904b15 commit 2de06e0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
22 changes: 17 additions & 5 deletions js.go
Expand Up @@ -1085,9 +1085,10 @@ type SequencePair struct {

// nextRequest is for getting next messages for pull based consumers from JetStream.
type nextRequest struct {
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
}

// jsSub includes JetStream subscription info.
Expand Down Expand Up @@ -2495,8 +2496,9 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
}

type pullOpts struct {
ttl time.Duration
ctx context.Context
maxBytes int
ttl time.Duration
ctx context.Context
}

// PullOpt are the options that can be passed when pulling a batch of messages.
Expand All @@ -2512,6 +2514,14 @@ func PullMaxWaiting(n int) SubOpt {
})
}

// PullMaxBytes defines the max bytes allowed for a fetch request.
type PullMaxBytes int

func (n PullMaxBytes) configurePull(opts *pullOpts) error {
opts.maxBytes = int(n)
return nil
}

var (
// errNoMessages is an error that a Fetch request using no_wait can receive to signal
// that there are no more messages available.
Expand Down Expand Up @@ -2687,6 +2697,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// For batch real size of 1, it does not make sense to set no_wait in
// the request.
noWait := batch-len(msgs) > 1

var nr nextRequest

sendReq := func() error {
Expand All @@ -2711,6 +2722,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
nr.Batch = batch - len(msgs)
nr.Expires = expires
nr.NoWait = noWait
nr.MaxBytes = o.maxBytes
req, _ := json.Marshal(nr)
return nc.PublishRequest(nms, rply, req)
}
Expand Down
3 changes: 2 additions & 1 deletion test/js_test.go
Expand Up @@ -5880,7 +5880,8 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) {
t.Fatal(err)
}
defer sub.Unsubscribe()
if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") {

if _, err := sub.Fetch(10, nats.PullMaxBytes(200)); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") {
t.Fatalf("Expected error about max request max bytes, got %v", err)
}
})
Expand Down

0 comments on commit 2de06e0

Please sign in to comment.