diff --git a/go_test.mod b/go_test.mod index 88adf064b..d426ef375 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.7.1-0.20220121194245-cfdca3df7649 + github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 0082dd6c7..83dfe546f 100644 --- a/go_test.sum +++ b/go_test.sum @@ -17,9 +17,9 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.7.1-0.20220121194245-cfdca3df7649 h1:RhaQyUdFELJCBm3lG9xG5pmYyaNNSb49y+YSi8P9hio= -github.com/nats-io/nats-server/v2 v2.7.1-0.20220121194245-cfdca3df7649/go.mod h1:cjxtMhZsZovK1XS2iiapCduR8HuqB/YpFamL0qntIcw= -github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b h1:h8EYD8Q7yUbjXmMT6z1XI7SAV+aiHhkNEc1O+WImMh4= +github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= +github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/js.go b/js.go index 88203e89d..7444782a2 100644 --- a/js.go +++ b/js.go @@ -903,6 +903,13 @@ type ConsumerConfig struct { FlowControl bool `json:"flow_control,omitempty"` Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` HeadersOnly bool `json:"headers_only,omitempty"` + + // Pull based options. + MaxRequestBatch int `json:"max_batch,omitempty"` + MaxRequestExpires time.Duration `json:"max_expires,omitempty"` + + // Ephemeral inactivity threshold. + InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` } // ConsumerInfo is the info from a JetStream consumer. @@ -2240,6 +2247,36 @@ func HeadersOnly() SubOpt { }) } +// MaxRequestBatch sets the maximum pull consumer batch size that a Fetch() +// can request. +func MaxRequestBatch(max int) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxRequestBatch = max + return nil + }) +} + +// MaxRequestExpires sets the maximum pull consumer request expiration that a +// Fetch() can request (using the Fetch's timeout value). +func MaxRequestExpires(max time.Duration) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxRequestExpires = max + return nil + }) +} + +// InactiveThreshold indicates how long the server should keep an ephemeral +// after detecting loss of interest. +func InactiveThreshold(threshold time.Duration) SubOpt { + return subOptFn(func(opts *subOpts) error { + if threshold < 0 { + return fmt.Errorf("invalid InactiveThreshold value (%v), needs to be greater or equal to 0", threshold) + } + opts.cfg.InactiveThreshold = threshold + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. diff --git a/jsm.go b/jsm.go index 8d0e52602..b1e407093 100644 --- a/jsm.go +++ b/jsm.go @@ -54,6 +54,9 @@ type JetStreamManager interface { // AddConsumer adds a consumer to a stream. AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) + // UpdateConsumer updates an existing consumer. + UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) + // DeleteConsumer deletes a consumer. DeleteConsumer(stream, consumer string, opts ...JSOpt) error @@ -276,6 +279,16 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C return info.ConsumerInfo, nil } +func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { + if cfg == nil { + return nil, ErrConsumerConfigRequired + } + if cfg.Durable == _EMPTY_ { + return nil, ErrInvalidDurableName + } + return js.AddConsumer(stream, cfg, opts...) +} + // consumerDeleteResponse is the response for a Consumer delete request. type consumerDeleteResponse struct { apiResponse diff --git a/test/js_test.go b/test/js_test.go index 74393e4a8..98a1c1933 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -35,6 +35,20 @@ import ( natsserver "github.com/nats-io/nats-server/v2/test" ) +func getConnAndJS(t *testing.T, s *server.Server) (*nats.Conn, nats.JetStreamContext) { + t.Helper() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Got error during initialization %v", err) + } + return nc, js +} + func TestJetStreamNotEnabled(t *testing.T) { s := RunServerOnPort(-1) defer s.Shutdown() @@ -778,6 +792,32 @@ func TestJetStreamSubscribe(t *testing.T) { // Both ChanQueueSubscribers use the same consumer. expectConsumers(t, 4) + + sub, err = js.SubscribeSync("foo", nats.InactiveThreshold(-100*time.Millisecond)) + if err == nil || !strings.Contains(err.Error(), "invalid InactiveThreshold") { + t.Fatalf("Expected error about invalid option, got %v", err) + } + + // Create an ephemeral with a lower inactive threshold + sub, err = js.SubscribeSync("foo", nats.InactiveThreshold(50*time.Millisecond)) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + ci, err := sub.ConsumerInfo() + if err != nil { + t.Fatalf("Error on consumer info: %v", err) + } + name := ci.Name + nc.Close() + + time.Sleep(150 * time.Millisecond) + + nc, js = getConnAndJS(t, s) + defer nc.Close() + + if ci, err := js.ConsumerInfo("TEST", name); err == nil { + t.Fatalf("Expected no consumer to exist, got %+v", ci) + } } func TestJetStreamAckPending_Pull(t *testing.T) { @@ -1306,6 +1346,83 @@ func TestJetStreamManagement(t *testing.T) { } }) + t.Run("update consumer", func(t *testing.T) { + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "update_push_consumer", + DeliverSubject: "bar", + AckPolicy: nats.AckExplicitPolicy, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Currently, server supports these fields: + // description, ack_wait, max_deliver, sample_freq, max_ack_pending, max_waiting and headers_only + expected := ci.Config + expected.Description = "my description" + expected.AckWait = 2 * time.Second + expected.MaxDeliver = 1 + expected.SampleFrequency = "30" + expected.MaxAckPending = 10 + expected.HeadersOnly = true + + // Check that stream name is required + _, err = js.UpdateConsumer("", &expected) + if err != nats.ErrStreamNameRequired { + t.Fatalf("Expected stream name required error, got %v", err) + } + // Check that durable name is required + expected.Durable = "" + _, err = js.UpdateConsumer("foo", &expected) + if err != nats.ErrInvalidDurableName { + t.Fatalf("Expected consumer name required error, got %v", err) + } + expected.Durable = "update_push_consumer" + + // Check that configuration is required + _, err = js.UpdateConsumer("foo", nil) + if err != nats.ErrConsumerConfigRequired { + t.Fatalf("Expected consumer configuration required error, got %v", err) + } + + // Now check that update works and expected fields have been updated + ci, err = js.UpdateConsumer("foo", &expected) + if err != nil { + t.Fatalf("Error on update: %v", err) + } + if !reflect.DeepEqual(ci.Config, expected) { + t.Fatalf("Expected config to be %+v, got %+v", expected, ci.Config) + } + + // Now check with pull consumer + ci, err = js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "update_pull_consumer", + AckPolicy: nats.AckExplicitPolicy, + MaxWaiting: 1, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Currently, server supports these fields: + // description, ack_wait, max_deliver, sample_freq, max_ack_pending, max_waiting and headers_only + expected = ci.Config + expected.Description = "my description" + expected.AckWait = 2 * time.Second + expected.MaxDeliver = 1 + expected.SampleFrequency = "30" + expected.MaxAckPending = 10 + expected.MaxWaiting = 20 + expected.HeadersOnly = true + expected.MaxRequestBatch = 10 + expected.MaxRequestExpires = 2 * time.Second + ci, err = js.UpdateConsumer("foo", &expected) + if err != nil { + t.Fatalf("Error on update: %v", err) + } + if !reflect.DeepEqual(ci.Config, expected) { + t.Fatalf("Expected config to be %+v, got %+v", expected, ci.Config) + } + }) + t.Run("purge stream", func(t *testing.T) { if err := js.PurgeStream("foo"); err != nil { t.Fatalf("Unexpected error: %v", err) @@ -5104,6 +5221,32 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) { } } + t.Run("max request batch", func(t *testing.T) { + defer js.PurgeStream(subject) + + sub, err := js.PullSubscribe(subject, "max-request-batch", nats.MaxRequestBatch(2)) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestBatch of 2") { + t.Fatalf("Expected error about max request batch size, got %v", err) + } + }) + + t.Run("max request expires", func(t *testing.T) { + defer js.PurgeStream(subject) + + sub, err := js.PullSubscribe(subject, "max-request-expires", nats.MaxRequestExpires(50*time.Millisecond)) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestExpires of 50ms") { + t.Fatalf("Expected error about max request expiration, got %v", err) + } + }) + t.Run("batch size", func(t *testing.T) { defer js.PurgeStream(subject)