Skip to content

Commit

Permalink
Merge pull request #1043 from bruth/max-request-max-bytes
Browse files Browse the repository at this point in the history
Add MaxRequestMaxBytes consumer option
  • Loading branch information
kozlovic committed Aug 15, 2022
2 parents 30d5319 + 2de06e0 commit ea2caaa
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 7 deletions.
36 changes: 29 additions & 7 deletions js.go
Expand Up @@ -1037,8 +1037,9 @@ type ConsumerConfig struct {
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
MaxRequestMaxBytes int `json:"max_bytes,omitempty"`

// Push based consumers.
DeliverSubject string `json:"deliver_subject,omitempty"`
Expand Down Expand Up @@ -1084,9 +1085,10 @@ type SequencePair struct {

// nextRequest is for getting next messages for pull based consumers from JetStream.
type nextRequest struct {
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
}

// jsSub includes JetStream subscription info.
Expand Down Expand Up @@ -2445,6 +2447,15 @@ func MaxRequestExpires(max time.Duration) SubOpt {
})
}

// MaxRequesMaxBytes sets the maximum pull consumer request bytes that a
// Fetch() can receive.
func MaxRequestMaxBytes(bytes int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxRequestMaxBytes = bytes
return nil
})
}

// InactiveThreshold indicates how long the server should keep an ephemeral
// after detecting loss of interest.
func InactiveThreshold(threshold time.Duration) SubOpt {
Expand Down Expand Up @@ -2485,8 +2496,9 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
}

type pullOpts struct {
ttl time.Duration
ctx context.Context
maxBytes int
ttl time.Duration
ctx context.Context
}

// PullOpt are the options that can be passed when pulling a batch of messages.
Expand All @@ -2502,6 +2514,14 @@ func PullMaxWaiting(n int) SubOpt {
})
}

// PullMaxBytes defines the max bytes allowed for a fetch request.
type PullMaxBytes int

func (n PullMaxBytes) configurePull(opts *pullOpts) error {
opts.maxBytes = int(n)
return nil
}

var (
// errNoMessages is an error that a Fetch request using no_wait can receive to signal
// that there are no more messages available.
Expand Down Expand Up @@ -2677,6 +2697,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// For batch real size of 1, it does not make sense to set no_wait in
// the request.
noWait := batch-len(msgs) > 1

var nr nextRequest

sendReq := func() error {
Expand All @@ -2701,6 +2722,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
nr.Batch = batch - len(msgs)
nr.Expires = expires
nr.NoWait = noWait
nr.MaxBytes = o.maxBytes
req, _ := json.Marshal(nr)
return nc.PublishRequest(nms, rply, req)
}
Expand Down
16 changes: 16 additions & 0 deletions test/js_test.go
Expand Up @@ -1411,6 +1411,8 @@ func TestJetStreamManagement(t *testing.T) {
expected.HeadersOnly = true
expected.MaxRequestBatch = 10
expected.MaxRequestExpires = 2 * time.Second
expected.MaxRequestMaxBytes = 1024

ci, err = js.UpdateConsumer("foo", &expected)
if err != nil {
t.Fatalf("Error on update: %v", err)
Expand Down Expand Up @@ -5870,6 +5872,20 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) {
}
})

t.Run("max request max bytes", func(t *testing.T) {
defer js.PurgeStream(subject)

sub, err := js.PullSubscribe(subject, "max-request-max-bytes", nats.MaxRequestMaxBytes(100))
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()

if _, err := sub.Fetch(10, nats.PullMaxBytes(200)); err == nil || !strings.Contains(err.Error(), "MaxRequestMaxBytes of 100") {
t.Fatalf("Expected error about max request max bytes, got %v", err)
}
})

t.Run("max request expires", func(t *testing.T) {
defer js.PurgeStream(subject)

Expand Down

0 comments on commit ea2caaa

Please sign in to comment.